Home » Enterprise Java » spring » Integration » Spring Integration Kafka Tutorial

About Mahboob Hussain

Mahboob Hussain
Mahboob Hussain graduated in Engineering from NIT Nagpur, India and has an MBA from Webster University, USA. He has executed roles in various aspects of software development and technical governance. He started with FORTRAN and has programmed in a variety of languages in his career, the mainstay of which has been Java. He is an associate editor in our team and has his personal homepage at http://bit.ly/mahboob

Spring Integration Kafka Tutorial

1. Introduction

Apache Kafka started as an internal project at LinkedIn to solve the problem of scaling up the enterprise architecture from services talking to each other with strong typing contracts to an asynchronous message-based architecture. Both message persistence and high throughput were the goals of their new system. In addition, messages were required to be acknowledged in order and give independent consumers the ability to manage the offset of the next message that they will process. LinkedIn donated Kafka to the Apache foundation and is now the most popular open-source streaming platform giving high reliability and clustering abilities.
Spring for Apache Kafka is a project that applies Spring concepts like dependency injection, annotations and listener containers to help develop messaging systems using Apache Kafka. Leveraging this project, the Spring Integration Kafka module provides two components:

 
i) Outbound Channel Adapter
As per the documentation page, “The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Kafka. Sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter”.

ii) Message Driven Channel Adapter
This is used on the consuming (receiving) side of the application. The incoming messages can be processed in the record or batch mode.

2. Application

The use case we will illustrate in this article is a library that sends newly arrived books to its readers. Each book belongs to a particular genre and readers subscribe to genres of their interest.

Application use case with one library (producer) publishing messages to Kafka and two subscribed readers (consumers).

The application is implemented in two Spring Boot projects:
a) ‘library’ which is the producer that sends Book messages to a Kafka broker
b) ‘reader’ which is the consumer that receives books.

In the code, we use four channel classes from Spring Integration: MessageChannel, DirectChannel, PollableChannel, and QueueChannel. MessageChannel is an interface that is implemented by all Spring Integration channels. It declares the send method which the concrete classes define how a sender sends a message to the channel.

The DirectChannel implements the SubscribableChannel (which extends MessageChannel) and has point-to-point semantics, that is, it will only send each Message to a single subscriber.

PollableChannel is an interface that extends the MessageChannel and is used for receiving messages. Classes implementing this interface provide functionality for polling messages from a channel.

QueueChannel implements implements multiple interfaces. It wraps a queue, provides point-to-point semantics and has functionality to filter and purge messages that satisfy a certain criteria.

One key point to note is that each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Hence we will run two instances of the reader project, each belonging to a different consumer group and subscribing to a different combination of topics. Both the consumer group and topics set are given to the application as command line arguments.

3. Environment

I have used the following technologies for this application:

    • Java 1.8
    • Spring Boot 1.5.9
    • Spring Kafka 1.3.2
    • Spring Integration Kafka 2.3.0
    • Maven 3.3.9
    • Ubuntu 16.04 LTS

4. Source Code

library : This is a maven-based project, so all the dependencies are specified in the pom.xml file.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.javacodegeeks.springintegration.kafka</groupId>
    <artifactId>producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>producer</name>
    <description>Kafka producer with Spring Boot</description>

    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Below is the Book class that serves as the model for the application.

Book.java

package org.javacodegeeks.springintegration.kafka.model;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@NoArgsConstructor
@ToString
public class Book {

	public enum Genre {
		fantasy, horror, romance, thriller
	}

	private long bookId;
	private String title;
	private Genre genre;
}

A Book has an enum indicating which genre it belongs to. The other two properties are bookId and title. The lombok annotations inject the setters, getters, a no-argument constructor and the toString() method to all the members.

Below is the BookPublisher class that initiates the message flow in the application.

BookPublisher.java

package org.javacodegeeks.springintegration.kafka.incoming;

import java.util.ArrayList;
import java.util.List;

import org.javacodegeeks.springintegration.kafka.model.Book;
import org.javacodegeeks.springintegration.kafka.model.Book.Genre;
import org.springframework.stereotype.Component;

@Component
public class BookPublisher {
	private long nextBookId;

	public BookPublisher() {
		this.nextBookId = 1001l;
	}

	public List getBooks() {
		List books = new ArrayList();

		books.add(createFantasyBook());
		books.add(createFantasyBook());
		books.add(createFantasyBook());
		books.add(createFantasyBook());
		books.add(createFantasyBook());
		books.add(createHorrorBook());
		books.add(createHorrorBook());
		books.add(createHorrorBook());
		books.add(createHorrorBook());
		books.add(createHorrorBook());
		books.add(createRomanceBook());
		books.add(createRomanceBook());
		books.add(createRomanceBook());
		books.add(createRomanceBook());
		books.add(createRomanceBook());
		books.add(createThrillerBook());
		books.add(createThrillerBook());
		books.add(createThrillerBook());
		books.add(createThrillerBook());
		books.add(createThrillerBook());

		return books;
	}

	Book createFantasyBook() {
		return createBook("", Genre.fantasy);
	}

	Book createHorrorBook() {
		return createBook("", Genre.horror);
	}

	Book createRomanceBook() {
		return createBook("", Genre.romance);
	}

	Book createThrillerBook() {
		return createBook("", Genre.thriller);
	}

	Book createBook(String title, Genre genre) {
		Book book = new Book();
		book.setBookId(nextBookId++);
		if (title == "") {
			title = "# " + Long.toString(book.getBookId());
		}
		book.setTitle(title);
		book.setGenre(genre);

		return book;
	}
}

The main functionality of this class is to create and return a list of twenty books, five each with the fantasy, horror, romance and thriller genres. There is a book creation method for each of the genre type, which call a utility method createBook by passing the correct enum type. Book ids start from 1001 and are set incrementally.

Below is the ProducerChannelConfig class that configures all the beans required for the producer application.

ProducerChannelConfig.java

package org.javacodegeeks.springintegration.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.messaging.MessageHandler;

@Configuration
public class ProducerChannelConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Bean
	public DirectChannel producerChannel() {
		return new DirectChannel();
	}

	@Bean
	@ServiceActivator(inputChannel = "producerChannel")
	public MessageHandler kafkaMessageHandler() {
		KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(kafkaTemplate());
		handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));

		return handler;
	}

	@Bean
	public KafkaTemplate kafkaTemplate() {
		return new KafkaTemplate(producerFactory());
	}

	@Bean
	public ProducerFactory producerFactory() {
		return new DefaultKafkaProducerFactory(producerConfigs());
	}

	@Bean
	public Map producerConfigs() {
		Map properties = new HashMap();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		// introduce a delay on the send to allow more messages to accumulate
		properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

		return properties;
	}
}

Below is the Library class that is the main class of the application and the publisher endpoint of the system.

Library.java

package org.javacodegeeks.springintegration.kafka;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.javacodegeeks.springintegration.kafka.incoming.BookPublisher;
import org.javacodegeeks.springintegration.kafka.model.Book;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
public class Library {

	@Autowired
	private BookPublisher bookPublisher;

	public static void main(String[] args) {
		ConfigurableApplicationContext context = new SpringApplicationBuilder(Library.class).web(false).run(args);
		context.getBean(Library.class).run(context);
		context.close();
	}

	private void run(ConfigurableApplicationContext context) {

		System.out.println("Inside ProducerApplication run method...");

		MessageChannel producerChannel = context.getBean("producerChannel", MessageChannel.class);

		List books = bookPublisher.getBooks();

		for (Book book : books) {
			Map headers = Collections.singletonMap(KafkaHeaders.TOPIC, book.getGenre().toString());
			producerChannel.send(new GenericMessage(book.toString(), headers));
		}

		System.out.println("Finished ProducerApplication run method...");
	};
}

From the application context, a MessageChannel bean is obtained. It then takes a list of 20 books from BookPublisher and sends those to producerChannel which is wired to the Kafka broker. The topic of each message is the book genre.

Below is the application.properties file that specifies values to the environment variables.

application.properties

spring.kafka.bootstrap-servers=localhost:9092

Here we specify port 9092 for the Kafka server to connect to.

Next, we take a look at the consumer side of the application.
reader: This is also a Maven-based project and all dependencies are configure in pom.xml.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.javacodegeeks.springintegration.kafka</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>consumer</name>
<description>Kafka consumer with Spring Boot</description>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>2.3.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.3.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>
</project>

Below is the ConsumerChannelConfig class that configures all the beans required for the consumer application.

ConsumerChannelConfig.java

package org.javacodegeeks.springintegration.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.messaging.PollableChannel;

@Configuration
public class ConsumerChannelConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Value("${spring.kafka.topic}")
	private String springIntegrationKafkaTopic;

	@Bean
	public PollableChannel consumerChannel() {
		return new QueueChannel();
	}

	@Bean
	public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
		KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
				kafkaListenerContainer());
		kafkaMessageDrivenChannelAdapter.setOutputChannel(consumerChannel());

		return kafkaMessageDrivenChannelAdapter;
	}

	@SuppressWarnings("unchecked")
	@Bean
	public ConcurrentMessageListenerContainer kafkaListenerContainer() {
		ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);

		return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
				consumerFactory(), containerProps);
	}

	@Bean
	public ConsumerFactory consumerFactory() {
		return new DefaultKafkaConsumerFactory(consumerConfigs());
	}

	@Bean
	public Map consumerConfigs() {
		Map properties = new HashMap();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
		return properties;
	}
}

Below is the SubscribedReader class that is the main class of the application and the consumer endpoint of the system.

SubscribedReader.java

package org.javacodegeeks.springintegration.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

@SpringBootApplication
public class SubscribedReader {

	@Autowired
	PollableChannel consumerChannel;

	public static void main(String[] args) {

		ConfigurableApplicationContext context = new SpringApplicationBuilder(SubscribedReader.class).run(args);

		List valid_topics = Arrays.asList("fantasy", "horror", "romance", "thriller");

		List topics = new ArrayList();
		if (args.length > 0) {
			for (String arg : args) {
				if (valid_topics.contains(arg))
					topics.add(arg);
			}
		}

		context.getBean(SubscribedReader.class).run(context, topics);
		context.close();
	}

	private void run(ConfigurableApplicationContext context, List topics) {

		System.out.println("Inside ConsumerApplication run method...");
		PollableChannel consumerChannel = context.getBean("consumerChannel", PollableChannel.class);

		for (String topic : topics)
			addAnotherListenerForTopics(topic);

		Message received = consumerChannel.receive();
		while (received != null) {
			received = consumerChannel.receive();
			System.out.println("Received " + received.getPayload());
		}
	}

	@Autowired
	private IntegrationFlowContext flowContext;

	@Autowired
	private KafkaProperties kafkaProperties;

	public void addAnotherListenerForTopics(String... topics) {
		Map consumerProperties = kafkaProperties.buildConsumerProperties();
		IntegrationFlow flow = IntegrationFlows
				.from(Kafka.messageDrivenChannelAdapter(
						new DefaultKafkaConsumerFactory(consumerProperties), topics))
				.channel("consumerChannel").get();
		this.flowContext.registration(flow).register();
	}
}

In the main method, we first check if there are any command line arguments. If they are present and are valid topics, they are added to an ArrayList that is passed as an argument to the run method.

In the run method, a PollableChannel bean, configured in ConsumerChannelConfig is obtained from the application context. All the subscribed topics are added as listeners via MessageDrivenChannelAdapter object by calling the method addAnotherListenerForTopics for each topic. Then we call receive method of the PollableChannel object inside a while loop to get the messages from the Kafka broker.

Below is the application.properties file that specifies values to the environment variables.

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.topic=dummy

A dummy topic is specified so that when the consumer application starts, the message channel is properly configured. Next, the actual topics to listen to are added. The dummy topic is never used by the producer to send messages.

5. How to Run

You will need five terminal windows.
Terminal 1: Start ZooKeeper. In your Kafka installation folder, run the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties

Terminal 2: Start KafkaServer. Go to your Kafka installation folder and run the following command:

bin/kafka-server-start.sh config/server.properties

Terminal 3: Start first consumer with group id “group-one” and subscribed to fantasy and horror genres. Changed directory to reader and run the following command:

mvn spring-boot:run -Dspring.kafka.consumer.group-id="group-one" -Drun.arguments="fantasy,horror"

Terminal 4: Start second consumer with group id “group-one” and subscribed to horror, romance and thriller genres. Change directory to reader and run the following command:

mvn spring-boot:run -Dspring.kafka.consumer.group-id="group-two" -Drun.arguments="horror,romance,thriller"

Terminal 5: Run producer. In the library folder, run the following command:

mvn spring-boot:run

You will see the received messages in terminals 3 and 4. Please note that you can run the commands in terminals 3, 4, and 5 in any order. Due to Kafka’s retention time policy, defaulted to 7 days and its file-like persistence mechanics, you will still get the same output.

Output for first consumer (reader) showing messages (books) received for two subscribed topics (genres).

Output for second consumer (reader) showing messages (books) received for three subscribed topics (genres).

6. Summary

In this example, we have seen the publish-subscribe mechanism provided by Apache Kafka and the methods by which Spring Integration enables applications to connect with it. We have also touched upon different message channels available with Spring Integration and described their key features.

7. Useful Links

Following resources will be very useful to get additional information and insights on concepts discussed in this article:

8. Download the Source Code

Download
You can download the full source code of this example here: spring-integration-kafka.zip
(No Ratings Yet)
Start the discussion Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

Leave a Reply

avatar
  Subscribe  
Notify of