Logback Kafka Appender Example
This article discusses Kafka
as a logging destination for a Java application. In this tutorial, We will use logback
as the logging framework.
1. Logback Kafka Appender – Introduction
Logback
is designed to be the successor for Log4j
. It has been developed by the same development community. These are some of the advantages logback has over log4j
- Faster implementation – ~10x faster on some critical areas.
- Automatic Reloading of configuration files.
- Ability to configure in
groovy
. - Gracefully recover from I/O failures.
- Conditional processing of configuration files.
- Native Support for
SLF4J
.
SLF4J
is expanded as Simple Logging Facade for Java. It provides a logging facade to Java applications enabling the option to switch out logging frameworks. Currently, it supports Log4J
, Logback
and java.util.logging
.
Kafka is a distributed streaming platform i.e. stores and processes stream of records. Kafka provides a messaging queue distributed over a set of instances. These instances form a cluster managed by zookeeper, a centralized service for storing configuration information. Kafka stores the streams in topics analogous to queues. A record to Kafka must contain key, value and timestamp. One of the predominant use cases of Kafka is for log aggregation which is the use case we will explore in the below tutorial.
2. Tools/Technologies
- IntelliJ Idea (Any Java IDE would work)
- Java 1.8.101 (1.8.x will do fine)
We will discuss the maven configuration for our application below.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jcg</groupId> <artifactId>logbackkafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
- We declare
logback-kafka-appender
,logback-classic
,slf4j-api
andkafka-clients
as dependencies. - We define
Maven
compiler to assemble the jar output and configure it with Java version of 1.8.
3. Kafka Installation/Configuration
Before We dive into the java application, Kafka has to be installed in our machine. Navigate to Kafka download and download the 2.11 version. In UNIX like os, type the command tar -xvf kafka_2.11-2.0.0.tgz
to unzip the file. Navigate to the kafka directory and run the following commands in terminal.
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
The first command starts the centralized configuration server for zookeeper
. This is used to coordinate the Kafka cluster and can also be spawn as a cluster instead of a single node. The default properties are as below:
zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0
- Zookeeper stores the configuration in the directory specified via
dataDir
. It is preferable to store it other thantmp
folders as some systems auto clean thetmp
directories. - Port on which Kafka nodes connect to. If changed from 2181, corresponding
Kafka
configuration also needs to be updated. - Number of client connections from an IP is specified in
maxClientCnxns
. Default is 0 indicating unlimited connections.
After the zookeeper
instance starts, We can start the kafka
cluster with the second command.
kafka.properties
zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
- Kafka has a lot of properties for fine-grained configuration. We are looking at the
zookeeper
properties alone i.e host:port on which it connects tozookeeper
and the connection timeout in milliseconds.
Running these two commands ensures that our kafka
instance is up and running.
4. Fast Kafka Appender
In this section, We will look at a highly performant kafka appender and discuss its pros and cons.
logback.xml
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <target>System.out</target> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender"> <target>System.err</target> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name="fast-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <topic>fast-logs</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <producerConfig>bootstrap.servers=localhost:9092</producerConfig> <producerConfig>acks=0</producerConfig> <producerConfig>linger.ms=100</producerConfig> <producerConfig>max.block.ms=100</producerConfig> <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig> </appender> <root level="info"> <appender-ref ref="fast-kafka-appender" /> <appender-ref ref="STDOUT"/> </root> </configuration>
- We have defined three logging appenders in order
- Appender for
System.Out
specified in target and we specify the pattern for log output in Pattern tag. - Appender for
System.err.
Logs to the console but used for errors. - kafka Appender which we will cover in below points
- Appender for
- There are variations of Encoders available in Logback. Our kafka appender uses
PatternLayoutEncoder
(default) and specifies the logging pattern - We specify the topic as
fast-logs
to which the log messages will be pushed. - Kafka uses partitions to handle things parallelly i.e. writes and reads to a partition of a topic can happen parallelly. Here, We specify
NoKeyKeyingStrategy
so that partitions will be created based on random keys. AsynchronousDeliveryStrategy
is used to send messages asynchronously to thekafka
queue i.e. non blocking mode.Kafka Producer configs
are documented here. We are aggregating messages till 100 ms and sending them in a batch. Our logger can block when the buffer is full which can happen if our connection to kafka is lost. With the configuration ofmax.block.ms
as 100, We start dropping messages instead of blocking when wait is more than 100 ms.- We specify level as
INFO
for the root logger and add our kafka appender along with Sysout appender.
We will quickly see a simple logger application for our purpose.
LoggerRoot.java
package com.jcg.logbackKafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LoggerRoot { private static final Logger logger = LoggerFactory.getLogger(LoggerRoot.class.getSimpleName()); public static void main(String... args) throws InterruptedException { int counter = 0; while (counter < 10) { logger.info("Counter:" + counter); counter++; } Thread.sleep(5=1000); logger.info("Completed"); } }
- This application prints the counter value 10 times.
- Application sleeps for 1 second and then prints the log completed.
On running the application, you can see the logs in console. To view the same logs in kafka, type the below command.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fast-logs
If the application is run before the consumer is started, run the same command with from-beginning option:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fast-logs --from-beginning
We see the following logs in kafka console.
[2018-09-25 07:50:25,350] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-18872 with old generation 0 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator) [2018-09-25 07:50:25,351] INFO [GroupCoordinator 0]: Stabilized group console-consumer-18872 generation 1 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator) [2018-09-25 07:50:25,354] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-18872 for generation 1 (kafka.coordinator.group.GroupCoordinator) [2018-09-25 07:50:25,354] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: __consumer_offsets-47. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache) 07:50:28.469 [main] INFO LoggerRoot - Counter:2 07:50:28.526 [kafka-producer-network-thread | machine.local-default-logback-relaxed] INFO org.apache.kafka.clients.Metadata - Cluster ID: zNOY9U5GT1y3HkPIQPUYvw 07:50:28.540 [main] INFO LoggerRoot - Counter:3 07:50:28.540 [main] INFO LoggerRoot - Counter:4 07:50:28.540 [main] INFO LoggerRoot - Counter:5 07:50:28.540 [main] INFO LoggerRoot - Counter:6 07:50:28.540 [main] INFO LoggerRoot - Counter:7 07:50:28.541 [main] INFO LoggerRoot - Counter:8 07:50:28.541 [main] INFO LoggerRoot - Counter:9
As we see, we are missing some of the logs and the final log is not printed. Since this is non-blocking appender, the application can complete before the logs are delivered. This is unreliable logging and can be used for logging debug messages.
5. Failsafe Kafka Appender
In this section, We will take a look at reliable Kafka appender which delivers logs in fail-safe manner.
Logback.xml(Reliable Appender)
<appender name="reliable-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <topic>reliable-logs</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.BlockingDeliveryStrategy"> <timeout>0</timeout> </deliveryStrategy> <producerConfig>bootstrap.servers=localhost:9092</producerConfig> <producerConfig>buffer.memory=8388608</producerConfig> <producerConfig>metadata.fetch.timeout.ms=99999999999</producerConfig> <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-restrictive</producerConfig> <producerConfig>compression.type=gzip</producerConfig> <appender-ref ref="STDERR"/> </appender> <root level="info"> <appender-ref ref="fast-kafka-appender" /> <appender-ref ref="reliable-kafka-appender" /> <appender-ref ref="STDOUT"/> </root>
- We use
ThresholdFilter
to filter the logs based on log level. Here, We specifyERROR
as the filter level. - There are variations of Encoders available in Logback. Our kafka appender uses
PatternLayoutEncoder
(default) and specifies the logging pattern - We specify the topic as
reliable-logs
to which the log messages will be pushed. - Kafka uses partitions to handle things parallelly i.e. writes and reads to a partition of a topic can happen parallelly. Here, We specify
HostNameKeyingStrategy
so that partitions will be created based on hostnames. Here, We are pushing from a single host and hence single partition. BlockingDeliveryStrategy
is used to send messages in a blocked manner to thekafka
queue. We specify the timeout as 0 to keep it blocked indefinitely unless we are able to send the message.Kafka Producer configs
are documented here.- We restrict the size of the buffered batches to 8MB (default is 32MB)
- If the kafka broker is not online when we try to log, just block until it becomes available using metadata.fetch.timeout.ms.
- We are using gzip to compress each batch of log messages. valid values available are none, gzip, snappy.
- We add this appender also to root logger.
To view the logs in kafka, type the below command.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic reliable-logs
Here, We specify the logging mode as ERROR
for the completed log.
LoggerRoot.java
logger.error("Completed");
This topic reliable-logs
only consumes the error logs. We can see that these logs are reliably delivered to the topic without failures.
6. Download the Source Code
You can download the full source code of this example here: LogbackKafkaAppender
attached zip does not have classes like com.github.danielwegener.logback.kafka.delivery.BlockingDeliveryStrategy
could you please point me to source location?