Logback

Logback Kafka Appender Example

This article discusses Kafka as a logging destination for a Java application. In this tutorial, We will use logback as the logging framework.

1. Logback Kafka Appender – Introduction

Logback is designed to be the successor for Log4j. It has been developed by the same development community. These are some of the advantages logback has over log4j

  • Faster implementation – ~10x faster on some critical areas.
  • Automatic Reloading of configuration files.
  • Ability to configure in groovy.
  • Gracefully recover from I/O failures.
  • Conditional processing of configuration files.
  • Native Support for SLF4J.

SLF4J is expanded as Simple Logging Facade for Java. It provides a logging facade to Java applications enabling the option to switch out logging frameworks. Currently, it supports Log4J, Logback and java.util.logging.

Kafka is a distributed streaming platform i.e. stores and processes stream of records. Kafka provides a messaging queue distributed over a set of instances. These instances form a cluster managed by zookeeper, a centralized service for storing configuration information. Kafka stores the streams in topics analogous to queues. A record to Kafka must contain key, value and timestamp. One of the predominant use cases of Kafka is for log aggregation which is the use case we will explore in the below tutorial.

2. Tools/Technologies

  • IntelliJ Idea (Any Java IDE would work)
  • Java 1.8.101 (1.8.x will do fine)

We will discuss the maven configuration for our application below.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jcg</groupId>
    <artifactId>logbackkafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  • We declare logback-kafka-appender, logback-classic,slf4j-api and kafka-clients as dependencies.
  • We define Maven compiler to assemble the jar output and configure it with Java version of 1.8.

3. Kafka Installation/Configuration

Before We dive into the java application, Kafka has to be installed in our machine. Navigate to Kafka download and download the 2.11 version. In UNIX like os, type the command tar -xvf kafka_2.11-2.0.0.tgz to unzip the file. Navigate to the kafka directory and run the following commands in terminal.

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

The first command starts the centralized configuration server for zookeeper. This is used to coordinate the Kafka cluster and can also be spawn as a cluster instead of a single node. The default properties are as below:

zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
  • Zookeeper stores the configuration in the directory specified via dataDir. It is preferable to store it other than tmp folders as some systems auto clean the tmp directories.
  • Port on which Kafka nodes connect to. If changed from 2181, corresponding Kafka configuration also needs to be updated.
  • Number of client connections from an IP is specified in maxClientCnxns. Default is 0 indicating unlimited connections.

After the zookeeper instance starts, We can start the kafka cluster with the second command.

kafka.properties

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
  •  Kafka has a lot of properties for fine-grained configuration. We are looking at the zookeeper properties alone i.e host:port on which it connects to zookeeper and the connection timeout in milliseconds.

Running these two commands ensures that our kafka instance is up and running.

4. Fast Kafka Appender

In this section, We will look at a highly performant kafka appender and discuss its pros and cons.

logback.xml

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.out</target>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
        <target>System.err</target>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="fast-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>

        <topic>fast-logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
        <producerConfig>acks=0</producerConfig>
        <producerConfig>linger.ms=100</producerConfig>
        <producerConfig>max.block.ms=100</producerConfig>
        <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>

    </appender>

    <root level="info">
        <appender-ref ref="fast-kafka-appender" />
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>
  • We have defined three logging appenders in order
    • Appender for System.Out specified in target and we specify the pattern for log output in Pattern tag.
    • Appender for System.err. Logs to the console but used for errors.
    • kafka Appender which we will cover in below points
  • There are variations of Encoders available in Logback. Our kafka appender uses PatternLayoutEncoder(default) and specifies the logging pattern
  • We specify the topic as fast-logs to which the log messages will be pushed.
  • Kafka uses partitions to handle things parallelly i.e. writes and reads to a partition of a topic can happen parallelly. Here, We specify NoKeyKeyingStrategy so that partitions will be created based on random keys.
  • AsynchronousDeliveryStrategy is used to send messages asynchronously to the kafka queue i.e. non blocking mode.
  • Kafka Producer configs are documented here. We are aggregating messages till 100 ms and sending them in a batch. Our logger can block when the buffer is full which can happen if our connection to kafka is lost. With the configuration of max.block.ms as 100, We start dropping messages instead of blocking when wait is more than 100 ms.
  • We specify level as INFO for the root logger and add our kafka appender along with Sysout appender.

We will quickly see a simple logger application for our purpose.

LoggerRoot.java

package com.jcg.logbackKafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggerRoot {

    private static final Logger logger = LoggerFactory.getLogger(LoggerRoot.class.getSimpleName());

    public static void main(String... args) throws InterruptedException {
        int counter = 0;
        while (counter < 10) {
            logger.info("Counter:" + counter);
            counter++;

        }
        Thread.sleep(5=1000);
        logger.info("Completed");
    }
}
  • This application prints the counter value 10 times.
  • Application sleeps for 1 second and then prints the log completed.

On running the application, you can see the logs in console. To view the same logs in kafka, type the below command.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fast-logs

If the application is run before the consumer is started, run the same command with from-beginning option:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fast-logs --from-beginning

We see the following logs in kafka console.

[2018-09-25 07:50:25,350] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-18872 with old generation 0 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
[2018-09-25 07:50:25,351] INFO [GroupCoordinator 0]: Stabilized group console-consumer-18872 generation 1 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
[2018-09-25 07:50:25,354] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-18872 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2018-09-25 07:50:25,354] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: __consumer_offsets-47. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
07:50:28.469 [main] INFO LoggerRoot - Counter:2

07:50:28.526 [kafka-producer-network-thread | machine.local-default-logback-relaxed] INFO org.apache.kafka.clients.Metadata - Cluster ID: zNOY9U5GT1y3HkPIQPUYvw

07:50:28.540 [main] INFO LoggerRoot - Counter:3

07:50:28.540 [main] INFO LoggerRoot - Counter:4

07:50:28.540 [main] INFO LoggerRoot - Counter:5

07:50:28.540 [main] INFO LoggerRoot - Counter:6

07:50:28.540 [main] INFO LoggerRoot - Counter:7

07:50:28.541 [main] INFO LoggerRoot - Counter:8

07:50:28.541 [main] INFO LoggerRoot - Counter:9

As we see, we are missing some of the logs and the final log is not printed. Since this is non-blocking appender, the application can complete before the logs are delivered. This is unreliable logging and can be used for logging debug messages.

5. Failsafe Kafka Appender

In this section, We will take a look at reliable Kafka appender which delivers logs in fail-safe manner.

Logback.xml(Reliable Appender)

<appender name="reliable-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender">

    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>ERROR</level>
    </filter>

    <encoder>
        <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>

    <topic>reliable-logs</topic>
    <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
    <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.BlockingDeliveryStrategy">
        <timeout>0</timeout>
    </deliveryStrategy>

    <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
    <producerConfig>buffer.memory=8388608</producerConfig>

    <producerConfig>metadata.fetch.timeout.ms=99999999999</producerConfig>
    <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-restrictive</producerConfig>
    <producerConfig>compression.type=gzip</producerConfig>

    <appender-ref ref="STDERR"/>
</appender>

<root level="info">
    <appender-ref ref="fast-kafka-appender" />
    <appender-ref ref="reliable-kafka-appender" />
    <appender-ref ref="STDOUT"/>
</root>
  • We use ThresholdFilter to filter the logs based on log level. Here, We specify ERROR as the filter level.
  • There are variations of Encoders available in Logback. Our kafka appender uses PatternLayoutEncoder(default) and specifies the logging pattern
  • We specify the topic as reliable-logs to which the log messages will be pushed.
  • Kafka uses partitions to handle things parallelly i.e. writes and reads to a partition of a topic can happen parallelly. Here, We specify HostNameKeyingStrategy so that partitions will be created based on hostnames. Here, We are pushing from a single host and hence single partition.
  • BlockingDeliveryStrategy is used to send messages in a blocked manner to the kafka queue. We specify the timeout as 0 to keep it blocked indefinitely unless we are able to send the message.
  • Kafka Producer configs are documented here.
    • We restrict the size of the buffered batches to 8MB (default is 32MB)
    • If the kafka broker is not online when we try to log, just block until it becomes available using metadata.fetch.timeout.ms.
    • We are using gzip to compress each batch of log messages. valid values available are none, gzip, snappy.
  • We add this appender also to root logger.

To view the logs in kafka, type the below command.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic reliable-logs

Here, We specify the logging mode as ERROR for the completed log.

LoggerRoot.java

logger.error("Completed");

This topic reliable-logs only consumes the error logs. We can see that these logs are reliably delivered to the topic without failures.

6. Download the Source Code

Download
You can download the full source code of this example here: LogbackKafkaAppender

Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Jainesh Modi
Jainesh Modi
3 years ago

attached zip does not have classes like com.github.danielwegener.logback.kafka.delivery.BlockingDeliveryStrategy

could you please point me to source location?

Back to top button