Using Apache Kafka with Spring Boot

Welcome, in this tutorial, we will see how to implement Kafka in a spring boot application.

1. Introduction

Before going further in this tutorial, we will look at the common terminology such as introduction to Spring Boot, Lombok, and Kafka.

1.1 Spring Boot

1.2 Kafka

1.3 Lombok

1.3.1 Lombok features

Feature Details
val Local variables are declared as final
var Mutable local variables
@Slf4J Creates an SLF4J logger
@Cleanup Will call close() on the resource in the finally block
@Getter Creates getter methods for all properties
@Setter Creates setter for all non-final properties
@EqualsAndHashCode
  • Generates implementations of equals(Object other) and hashCode()
  • By default will use all non-static, non-transient properties
  • Can optionally exclude specific properties
@ToString
  • Generates String of class name, and each field separated by commas
  • Optional parameter to include field names
  • Optional parameter to include a call to the super toString method
@NoArgsConstructor
  • Generates no-args constructor
  • Will cause compiler error if there are final fields
  • Can optionally force, which will initialize final fields with 0/false/null var – mutable local variables
@RequiredArgsContructor
  • Generates a constructor for all fields that are final or marked @NonNull
  • The constructor will throw a NullPointerException if any @NonNull fields are null val – local variables are declared final
@AllArgsConstructor
  • Generates a constructor for all properties of the class
  • Any @NotNull properties will have null checks
@Data
  • Generates typical boilerplate code for POJOs
  • Combines – @Getter, @Setter, @ToString, @EqualsAndHashCode, @RequiredArgsConstructor
  • No constructor is generated if constructors have been explicitly declared
@Builder
  • Implements the Builder pattern for object creation
@Value
  • The immutable variant of @Data
  • All fields are made private and final by default

Let us go ahead with the tutorial implementation but before going any further I’m assuming that you’re aware of the Spring boot basics.

2. Using Apache Kafka with Spring Boot

2.1 Application Pre-requisite

To start with this tutorial, I am hoping that you have the Kafka up and running in your location environment. For easy setup, I have the Kafka up and running on the Docker environment. You can execute the below script using the docker-compose command to get the Kafka container running on Docker in minutes.

docker-compose.yml

services:
  kafka:
    container_name: kafka
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - '2181:2181'
version: '3'

If everything goes well the container would be started successfully as shown in Fig. 1. You can use the docker ps -a command to confirm that the Kafka container is started successfully.

Fig.1: Kafka container on Docker

2.2 Tools Used for Spring boot application and Project Structure

We are using Eclipse Kepler SR2, JDK 8, and Maven. In case you’re confused about where you should create the corresponding files or folder, let us review the project structure of the spring boot application.

Fig. 2: Project structure

Let us start building the application!

3. Creating a Spring Boot application

Below are the steps involved in developing the application.

3.1 Maven Dependency

Here, we specify the dependency for the Spring boot (Web), Lombok, and Kafka. Maven will automatically resolve the other dependencies. The updated file will have the following code.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.1</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.springboot.kafka</groupId>
	<artifactId>SpringbootandKafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringbootandKafka</name>
	<description>Springboot and Kafka</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

3.2 Application yml file

Create a new yml file at the location: SpringbootandKafka/src/main/resources/ and add the following code to it. Here we will define the application and Kafka configuration (such as topic name, consumer, and producer).

application.yml

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: 'localhost:9092'
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: 'localhost:9092'
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
  topic:
    name: employees

3.3 Java Classes

Let us write all the java class(es) involved in this application. For a quick sight, I am not creating the Employee model class as it contains the basic properties with a constructor and the getter methods.

3.3.1 Implementation/Main class

Add the following code to the main class to bootstrap the application from the main method. Always remember, the entry point of the spring boot application is the class containing @SpringBootApplication annotation and the static main method.

SpringbootandKafkaApplication.java

package com.springboot.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@Slf4j
public class SpringbootandKafkaApplication {
	public static void main(String[] args) {
		SpringApplication.run(SpringbootandKafkaApplication.class, args);
		log.info("Springboot and kafka application is started successfully.");
	}
}

3.3.2 Producer class

Add the following code to the producer class which will be responsible for sending the message to the Kafka topic. We will use the KafkaTemplate to send the message to the Kafka topic.

Producer.java

package com.springboot.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Producer {
    // reading the property from the application.yml file
    // if value is not specified it will pick up the default value as "employees"
    @Value("${kafka.topic.name:employees}")
    private String topic;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessageToTopic(final String message) {
        log.info("Sending message to kafka = {}", message);
        kafkaTemplate.send(topic, message);
    }
}

3.3.3 Consumer class

Add the following code to the producer class which will be responsible for consuming the message from the Kafka topic. The consume(…) will be annotated with the @KafkaListener annotation to listen to a message from the given topic.

Consumer.java

package com.springboot.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Consumer {
    @KafkaListener(topics = "#{'${kafka.topic.name:employees}'}", groupId = "group_id")
    public void consume(final String message) {
        log.info("Consuming message.");
        log.info(message);
    }
}

3.3.4 Controller class

Add the following code to the controller class which will be responsible to get input from the user and send it to the Kafka topic which will later be consumed by a consumer.

KafkaController.java

package com.springboot.kafka.controller;
import com.springboot.kafka.model.Employee;
import com.springboot.kafka.service.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/api/kafka")
@Slf4j
public class KafkaController {
    @Autowired
    private Producer producer;
    // URL - http://localhost:9000/api/kafka/send
    @ResponseStatus(value = HttpStatus.ACCEPTED)
    @PostMapping(value = "/send")
    public void send(@RequestBody final Employee employee) {
        log.info("Sending message to kafka topic");
        producer.sendMessageToTopic(employee.toString());
    }
}

4. Run the Application

To execute the application, right-click on the SpringbootandKafkaApplication.java class, Run As -> Java Application.

Fig. 3: Run the Application

5. Project Demo

When the application is started, you can use the exposed endpoint to send an HTTP POST to the controller. You are free to use postman or any other tool of your choice to make a post request to the endpoint.

Application endpoint

-- HTTP POST endpoint url --
http://localhost:9000/api/kafka/send
-- sample request body --
{
    "id": "c9897b40-deb5-408c-ab60-94dda8f04e5f",
    "firstname": "john",
    "lastname": "wick",
    "jobtitle": "hitman",
    "phone": "001-541-754-3010"
}

Once the request is successfully received by the controller method the employee object will be sent to the Kafka topic as a string message and will also be consumed by a consumer (who is subscribed to the same topic on which the message is sent out). We will verify the same with the help of logs.

Producer and Consumer logs

-- producer logs –
2020-12-15 10:58:23.077  INFO 16128 --- [nio-9000-exec-2] c.s.kafka.controller.KafkaController     : Sending message to kafka topic
2020-12-15 10:58:23.078  INFO 16128 --- [nio-9000-exec-2] com.springboot.kafka.service.Producer    : Sending message to kafka = Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679)
-- consumer logs –
2020-12-15 10:58:23.259  INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer    : Consuming message.
2020-12-15 10:58:23.259  INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer    : Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679)

That is all for this tutorial and I hope the article served you whatever you were looking for. Happy Learning and do not forget to share!

7. Summary

In this section, you learned:

You can download the sample application as an Eclipse project in the Downloads section.

8. Download the Project

This was an example of implementing Kafka in a Spring Boot application.

Download
You can download the full source code of this example here: Using Apache Kafka with Spring Boot
Exit mobile version