Spring Integration Kafka Tutorial
In this tutorial, we will show the Spring Integration with Kafka through examples.
1. Introduction
Apache Kafka started as an internal project at LinkedIn to solve the problem of scaling up the enterprise architecture from services talking to each other with strong typing contracts to an asynchronous message-based architecture. Both message persistence and high throughput were the goals of their new system. In addition, messages were required to be acknowledged in order and give independent consumers the ability to manage the offset of the next message that they will process. LinkedIn donated Kafka to the Apache foundation and is now the most popular open-source streaming platform giving high reliability and clustering abilities.
Spring for Apache Kafka is a project that applies Spring concepts like dependency injection, annotations and listener containers to help develop messaging systems using Apache Kafka. Leveraging this project, the Spring Integration Kafka module provides two components:
i) Outbound Channel Adapter
As per the documentation page, “The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Kafka. Sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter”.
ii) Message Driven Channel Adapter
This is used on the consuming (receiving) side of the application. The incoming messages can be processed in record or batch mode.
2.Spring Integration Kafka Application
The use case we will illustrate in this article is a library that sends newly arrived books to its readers. Each book belongs to a particular genre and readers subscribe to genres of their interest.
The application is implemented in two Spring Boot projects:
a) ‘library’ which is the producer that sends Book messages to a Kafka broker
b) ‘reader’ which is the consumer that receives books.
In the code, we use four-channel classes from Spring Integration: MessageChannel
, DirectChannel
, PollableChannel
, and QueueChannel
. MessageChannel
is an interface that is implemented by all Spring Integration channels. It declares the send method which the concrete classes define how a sender sends a message to the channel.
The DirectChannel
implements the SubscribableChannel
(which extends MessageChannel
) and has point-to-point semantics, that is, it will only send each Message
to a single subscriber.
PollableChannel
is an interface that extends the MessageChannel
and is used for receiving messages. Classes implementing this interface provide functionality for polling messages from a channel.
QueueChannel
implements multiple interfaces. It wraps a queue, provides point-to-point semantics, and has the functionality to filter and purge messages that satisfy certain criteria.
One key point to note is that each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Hence we will run two instances of the reader project, each belonging to a different consumer group and subscribing to a different combination of topics. Both the consumer group and topics set are given to the application as command-line arguments.
3. Environment
I have used the following technologies for this application:
- Java 1.8
- Spring Boot 1.5.9
- Spring Kafka 1.3.2
- Spring Integration Kafka 2.3.0
- Maven 3.3.9
- Ubuntu 16.04 LTS
4. Source Code
library: This is a maven-based project, so all the dependencies are specified in the pom.xml file.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.javacodegeeks.springintegration.kafka</groupId> <artifactId>producer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>producer</name> <description>Kafka producer with Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.3.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Below is the Book
class that serves as the model for the application.
Book.java
package org.javacodegeeks.springintegration.kafka.model; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @Getter @Setter @NoArgsConstructor @ToString public class Book { public enum Genre { fantasy, horror, romance, thriller } private long bookId; private String title; private Genre genre; }
A Book has an enum
indicating which genre it belongs to. The other two properties are bookId
and title
. The lombok
annotations inject the setters, getters, a no-argument constructor and the toString()
method to all the members.
Below is the BookPublisher
class that initiates the message flow in the application.
BookPublisher.java
package org.javacodegeeks.springintegration.kafka.incoming; import java.util.ArrayList; import java.util.List; import org.javacodegeeks.springintegration.kafka.model.Book; import org.javacodegeeks.springintegration.kafka.model.Book.Genre; import org.springframework.stereotype.Component; @Component public class BookPublisher { private long nextBookId; public BookPublisher() { this.nextBookId = 1001l; } public List getBooks() { List books = new ArrayList(); books.add(createFantasyBook()); books.add(createFantasyBook()); books.add(createFantasyBook()); books.add(createFantasyBook()); books.add(createFantasyBook()); books.add(createHorrorBook()); books.add(createHorrorBook()); books.add(createHorrorBook()); books.add(createHorrorBook()); books.add(createHorrorBook()); books.add(createRomanceBook()); books.add(createRomanceBook()); books.add(createRomanceBook()); books.add(createRomanceBook()); books.add(createRomanceBook()); books.add(createThrillerBook()); books.add(createThrillerBook()); books.add(createThrillerBook()); books.add(createThrillerBook()); books.add(createThrillerBook()); return books; } Book createFantasyBook() { return createBook("", Genre.fantasy); } Book createHorrorBook() { return createBook("", Genre.horror); } Book createRomanceBook() { return createBook("", Genre.romance); } Book createThrillerBook() { return createBook("", Genre.thriller); } Book createBook(String title, Genre genre) { Book book = new Book(); book.setBookId(nextBookId++); if (title == "") { title = "# " + Long.toString(book.getBookId()); } book.setTitle(title); book.setGenre(genre); return book; } }
The main functionality of this class is to create and return a list of twenty books, five each with the fantasy, horror, romance, and thriller genres. There is a book creation method for each of the genre type, which call a utility method createBook
by passing the correct enum
type. Book ids start from 1001 and are set incrementally.
Below is the ProducerChannelConfig
class that configures all the beans required for the producer application.
ProducerChannelConfig.java
package org.javacodegeeks.springintegration.kafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.messaging.MessageHandler; @Configuration public class ProducerChannelConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public DirectChannel producerChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "producerChannel") public MessageHandler kafkaMessageHandler() { KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(kafkaTemplate()); handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); return handler; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(producerConfigs()); } @Bean public Map producerConfigs() { Map properties = new HashMap(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // introduce a delay on the send to allow more messages to accumulate properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); return properties; } }
Below is the Library
class that is the main class of the application and the publisher endpoint of the system.
Library.java
package org.javacodegeeks.springintegration.kafka; import java.util.Collections; import java.util.List; import java.util.Map; import org.javacodegeeks.springintegration.kafka.incoming.BookPublisher; import org.javacodegeeks.springintegration.kafka.model.Book; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; @SpringBootApplication public class Library { @Autowired private BookPublisher bookPublisher; public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(Library.class).web(false).run(args); context.getBean(Library.class).run(context); context.close(); } private void run(ConfigurableApplicationContext context) { System.out.println("Inside ProducerApplication run method..."); MessageChannel producerChannel = context.getBean("producerChannel", MessageChannel.class); List books = bookPublisher.getBooks(); for (Book book : books) { Map headers = Collections.singletonMap(KafkaHeaders.TOPIC, book.getGenre().toString()); producerChannel.send(new GenericMessage(book.toString(), headers)); } System.out.println("Finished ProducerApplication run method..."); }; }
From the application context, a MessageChannel
bean is obtained. It then takes a list of 20 books from BookPublisher
and sends those to producerChannel
which is wired to the Kafka broker. The topic of each message is the book genre.
Below is the application.properties
file that specifies values to the environment variables.
application.properties
spring.kafka.bootstrap-servers=localhost:9092
Here we specify port 9092 for the Kafka server to connect to.
Next, we take a look at the consumer side of the application.
reader: This is also a Maven-based project and all dependencies are configured in pom.xml.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.javacodegeeks.springintegration.kafka</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>consumer</name> <description>Kafka consumer with Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.3.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Below is the ConsumerChannelConfig
class that configures all the beans required for the consumer application.
ConsumerChannelConfig.java
package org.javacodegeeks.springintegration.kafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.messaging.PollableChannel; @Configuration public class ConsumerChannelConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.topic}") private String springIntegrationKafkaTopic; @Bean public PollableChannel consumerChannel() { return new QueueChannel(); } @Bean public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter( kafkaListenerContainer()); kafkaMessageDrivenChannelAdapter.setOutputChannel(consumerChannel()); return kafkaMessageDrivenChannelAdapter; } @SuppressWarnings("unchecked") @Bean public ConcurrentMessageListenerContainer kafkaListenerContainer() { ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic); return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer( consumerFactory(), containerProps); } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory(consumerConfigs()); } @Bean public Map consumerConfigs() { Map properties = new HashMap(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy"); return properties; } }
Below is the SubscribedReader
class that is the main class of the application and the consumer endpoint of the system.
SubscribedReader.java
package org.javacodegeeks.springintegration.kafka; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.dsl.kafka.Kafka; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; @SpringBootApplication public class SubscribedReader { @Autowired PollableChannel consumerChannel; public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(SubscribedReader.class).run(args); List valid_topics = Arrays.asList("fantasy", "horror", "romance", "thriller"); List topics = new ArrayList(); if (args.length > 0) { for (String arg : args) { if (valid_topics.contains(arg)) topics.add(arg); } } context.getBean(SubscribedReader.class).run(context, topics); context.close(); } private void run(ConfigurableApplicationContext context, List topics) { System.out.println("Inside ConsumerApplication run method..."); PollableChannel consumerChannel = context.getBean("consumerChannel", PollableChannel.class); for (String topic : topics) addAnotherListenerForTopics(topic); Message received = consumerChannel.receive(); while (received != null) { received = consumerChannel.receive(); System.out.println("Received " + received.getPayload()); } } @Autowired private IntegrationFlowContext flowContext; @Autowired private KafkaProperties kafkaProperties; public void addAnotherListenerForTopics(String... topics) { Map consumerProperties = kafkaProperties.buildConsumerProperties(); IntegrationFlow flow = IntegrationFlows .from(Kafka.messageDrivenChannelAdapter( new DefaultKafkaConsumerFactory(consumerProperties), topics)) .channel("consumerChannel").get(); this.flowContext.registration(flow).register(); } }
In the main method, we first check if there are any command line arguments. If they are present and are valid topics, they are added to an ArrayList
that is passed as an argument to the run
method.
In the run
method, a PollableChannel
bean, configured in ConsumerChannelConfig
is obtained from the application context. All the subscribed topics are added as listeners via MessageDrivenChannelAdapter
object by calling the method addAnotherListenerForTopics
for each topic. Then we call receive method of the PollableChannel
object inside a while loop to get the messages from the Kafka broker.
Below is the application.properties
file that specifies values to the environment variables.
application.properties
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.topic=dummy
A dummy topic is specified so that when the consumer application starts, the message channel is properly configured. Next, the actual topics to listen to are added. The dummy topic is never used by the producer to send messages.
5. How to Run
You will need five terminal windows.
Terminal 1: Start ZooKeeper. In your Kafka installation folder, run the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
Terminal 2: Start KafkaServer. Go to your Kafka installation folder and run the following command:
bin/kafka-server-start.sh config/server.properties
Terminal 3: Start the first consumer with group id “group-one” and subscribed to fantasy and horror genres. Changed directory to the reader and run the following command:
mvn spring-boot:run -Dspring.kafka.consumer.group-id="group-one" -Drun.arguments="fantasy,horror"
Terminal 4: Start the second consumer with group id “group-one” and subscribed to horror, romance and thriller genres. Change directory to the reader and run the following command:
mvn spring-boot:run -Dspring.kafka.consumer.group-id="group-two" -Drun.arguments="horror,romance,thriller"
Terminal 5: Run producer. In the library folder, run the following command:
mvn spring-boot:run
You will see the received messages in terminals 3 and 4. Please note that you can run the commands in terminals 3, 4, and 5 in any order. Due to Kafka’s retention time policy, defaulted to 7 days and its file-like persistence mechanics, you will still get the same output.
6. Summary
In this example, we have seen the publish-subscribe mechanism provided by Apache Kafka and the methods by which Spring Integration enables applications to connect with it. We have also touched upon different message channels available with Spring Integration and described their key features.
7. Useful Links
The following resources will be very useful to get additional information and insights on concepts discussed in this article:
- https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
- https://kafka.apache.org/intro
- http://projects.spring.io/spring-kafka/
- https://docs.spring.io/spring-kafka/reference/html/_spring_integration.html
8. More articles
9. Download the Source Code
You can download the full source code of this example here: Spring Integration Kafka Tutorial
Last updated on May 18th, 2021
can you please post for spring mvc as well, we will be glad to view it.