Using Apache Kafka with Spring Boot
Welcome, in this tutorial, we will see how to implement Kafka in a spring boot application.
1. Introduction
Before going further in this tutorial, we will look at the common terminology such as introduction to Spring Boot, Lombok, and Kafka.
1.1 Spring Boot
- Spring boot is a module that provides rapid application development feature to the spring framework including auto-configuration, standalone-code, and production-ready code
- It creates applications that are packaged as jar and are directly started using embedded servlet container (such as Tomcat, Jetty or, Undertow). Thus, no need to deploy the war files
- It simplifies the maven configuration by providing the starter template and helps to resolve the dependency conflicts. It automatically identifies the required dependencies and imports them into the application
- It helps in removing the boilerplate code, extra annotations, and XML configurations
- It provides powerful batch processing and manages the rest endpoints
- It provides an efficient JPA-starter library to effectively connect the application with the relational databases
- It offers a Microservice architecture and cloud configuration that manages all the application related configuration properties in a centralized manner
1.2 Kafka
- Kafka is a publish-subscribe messaging system that helps us to exchange the data between the services
- Kafka allows a sender (known as a producer) to send the message to a Kafka topic and a receiver (known as a consumer) to receive the message
- Kafka also provides a streaming process for the processing of data in parallel connected systems. This technique allows different applications to limit the parallel execution of the data where one records execute without waiting for the output from another record
- Kafka is capable of handling millions of messages per second
- Offers extremely high performance and a resilient architecture
1.3 Lombok
- Lombok is nothing but a small library that reduces the amount of boilerplate Java code from the project
- Automatically generates the getters and setters for the object by using the Lombok annotations
- Hooks in via the Annotation processor API
- Raw source code is passed to Lombok for code generation before the Java Compiler continues. Thus, produces properly compiled Java code in conjunction with the Java Compiler
- Under the
target/classes
folder you can view the compiled class files - Can be used with Maven, Gradle IDE, etc.
1.3.1 Lombok features
Feature | Details |
---|---|
val | Local variables are declared as final |
var | Mutable local variables |
@Slf4J | Creates an SLF4J logger |
@Cleanup | Will call close() on the resource in the finally block |
@Getter | Creates getter methods for all properties |
@Setter | Creates setter for all non-final properties |
@EqualsAndHashCode |
|
@ToString |
|
@NoArgsConstructor |
|
@RequiredArgsContructor |
|
@AllArgsConstructor |
|
@Data |
|
@Builder |
|
@Value |
|
Let us go ahead with the tutorial implementation but before going any further I’m assuming that you’re aware of the Spring boot basics.
2. Using Apache Kafka with Spring Boot
2.1 Application Pre-requisite
To start with this tutorial, I am hoping that you have the Kafka up and running in your location environment. For easy setup, I have the Kafka up and running on the Docker environment. You can execute the below script using the docker-compose
command to get the Kafka container running on Docker in minutes.
docker-compose.yml
services: kafka: container_name: kafka environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' image: wurstmeister/kafka ports: - '9092:9092' zookeeper: container_name: zookeeper image: wurstmeister/zookeeper ports: - '2181:2181' version: '3'
If everything goes well the container would be started successfully as shown in Fig. 1. You can use the docker ps -a
command to confirm that the Kafka container is started successfully.
2.2 Tools Used for Spring boot application and Project Structure
We are using Eclipse Kepler SR2, JDK 8, and Maven. In case you’re confused about where you should create the corresponding files or folder, let us review the project structure of the spring boot application.
Let us start building the application!
3. Creating a Spring Boot application
Below are the steps involved in developing the application.
3.1 Maven Dependency
Here, we specify the dependency for the Spring boot (Web), Lombok, and Kafka. Maven will automatically resolve the other dependencies. The updated file will have the following code.
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.1</version> <relativePath /> <!-- lookup parent from repository --> </parent> <groupId>com.springboot.kafka</groupId> <artifactId>SpringbootandKafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringbootandKafka</name> <description>Springboot and Kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
3.2 Application yml file
Create a new yml file at the location: SpringbootandKafka/src/main/resources/
and add the following code to it. Here we will define the application and Kafka configuration (such as topic name, consumer, and producer).
application.yml
server: port: 9000 spring: kafka: consumer: bootstrap-servers: 'localhost:9092' group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: 'localhost:9092' key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer kafka: topic: name: employees
3.3 Java Classes
Let us write all the java class(es) involved in this application. For a quick sight, I am not creating the Employee
model class as it contains the basic properties with a constructor and the getter methods.
3.3.1 Implementation/Main class
Add the following code to the main class to bootstrap the application from the main method. Always remember, the entry point of the spring boot application is the class containing @SpringBootApplication
annotation and the static main method.
SpringbootandKafkaApplication.java
package com.springboot.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @Slf4j public class SpringbootandKafkaApplication { public static void main(String[] args) { SpringApplication.run(SpringbootandKafkaApplication.class, args); log.info("Springboot and kafka application is started successfully."); } }
3.3.2 Producer class
Add the following code to the producer class which will be responsible for sending the message to the Kafka topic. We will use the KafkaTemplate
to send the message to the Kafka topic.
Producer.java
package com.springboot.kafka.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service @Slf4j public class Producer { // reading the property from the application.yml file // if value is not specified it will pick up the default value as "employees" @Value("${kafka.topic.name:employees}") private String topic; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessageToTopic(final String message) { log.info("Sending message to kafka = {}", message); kafkaTemplate.send(topic, message); } }
3.3.3 Consumer class
Add the following code to the producer class which will be responsible for consuming the message from the Kafka topic. The consume(…)
will be annotated with the @KafkaListener
annotation to listen to a message from the given topic.
Consumer.java
package com.springboot.kafka.service; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @Slf4j public class Consumer { @KafkaListener(topics = "#{'${kafka.topic.name:employees}'}", groupId = "group_id") public void consume(final String message) { log.info("Consuming message."); log.info(message); } }
3.3.4 Controller class
Add the following code to the controller class which will be responsible to get input from the user and send it to the Kafka topic which will later be consumed by a consumer.
KafkaController.java
package com.springboot.kafka.controller; import com.springboot.kafka.model.Employee; import com.springboot.kafka.service.Producer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping(value = "/api/kafka") @Slf4j public class KafkaController { @Autowired private Producer producer; // URL - http://localhost:9000/api/kafka/send @ResponseStatus(value = HttpStatus.ACCEPTED) @PostMapping(value = "/send") public void send(@RequestBody final Employee employee) { log.info("Sending message to kafka topic"); producer.sendMessageToTopic(employee.toString()); } }
4. Run the Application
To execute the application, right-click on the SpringbootandKafkaApplication.java
class, Run As -> Java Application
.
5. Project Demo
When the application is started, you can use the exposed endpoint to send an HTTP POST to the controller. You are free to use postman or any other tool of your choice to make a post request to the endpoint.
Application endpoint
-- HTTP POST endpoint url -- http://localhost:9000/api/kafka/send -- sample request body -- { "id": "c9897b40-deb5-408c-ab60-94dda8f04e5f", "firstname": "john", "lastname": "wick", "jobtitle": "hitman", "phone": "001-541-754-3010" }
Once the request is successfully received by the controller method the employee object will be sent to the Kafka topic as a string message and will also be consumed by a consumer (who is subscribed to the same topic on which the message is sent out). We will verify the same with the help of logs.
Producer and Consumer logs
-- producer logs – 2020-12-15 10:58:23.077 INFO 16128 --- [nio-9000-exec-2] c.s.kafka.controller.KafkaController : Sending message to kafka topic 2020-12-15 10:58:23.078 INFO 16128 --- [nio-9000-exec-2] com.springboot.kafka.service.Producer : Sending message to kafka = Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679) -- consumer logs – 2020-12-15 10:58:23.259 INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer : Consuming message. 2020-12-15 10:58:23.259 INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer : Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679)
That is all for this tutorial and I hope the article served you whatever you were looking for. Happy Learning and do not forget to share!
7. Summary
In this section, you learned:
- Spring boot, Kafka, and Lombok introduction
- Steps to implement Kafka in Spring Boot and run Kafka on Docker using
docker-compose
command
You can download the sample application as an Eclipse project in the Downloads section.
8. Download the Project
This was an example of implementing Kafka in a Spring Boot application.
You can download the full source code of this example here: Using Apache Kafka with Spring Boot