Boot

Using Apache Kafka with Spring Boot

Welcome, in this tutorial, we will see how to implement Kafka in a spring boot application.

1. Introduction

Before going further in this tutorial, we will look at the common terminology such as introduction to Spring Boot, Lombok, and Kafka.

1.1 Spring Boot

  • Spring boot is a module that provides rapid application development feature to the spring framework including auto-configuration, standalone-code, and production-ready code
  • It creates applications that are packaged as jar and are directly started using embedded servlet container (such as Tomcat, Jetty or, Undertow). Thus, no need to deploy the war files
  • It simplifies the maven configuration by providing the starter template and helps to resolve the dependency conflicts. It automatically identifies the required dependencies and imports them into the application
  • It helps in removing the boilerplate code, extra annotations, and XML configurations
  • It provides powerful batch processing and manages the rest endpoints
  • It provides an efficient JPA-starter library to effectively connect the application with the relational databases
  • It offers a Microservice architecture and cloud configuration that manages all the application related configuration properties in a centralized manner

1.2 Kafka

  • Kafka is a publish-subscribe messaging system that helps us to exchange the data between the services
  • Kafka allows a sender (known as a producer) to send the message to a Kafka topic and a receiver (known as a consumer) to receive the message
  • Kafka also provides a streaming process for the processing of data in parallel connected systems. This technique allows different applications to limit the parallel execution of the data where one records execute without waiting for the output from another record
  • Kafka is capable of handling millions of messages per second
  • Offers extremely high performance and a resilient architecture

1.3 Lombok

  • Lombok is nothing but a small library that reduces the amount of boilerplate Java code from the project
  • Automatically generates the getters and setters for the object by using the Lombok annotations
  • Hooks in via the Annotation processor API
  • Raw source code is passed to Lombok for code generation before the Java Compiler continues. Thus, produces properly compiled Java code in conjunction with the Java Compiler
  • Under the target/classes folder you can view the compiled class files
  • Can be used with Maven, Gradle IDE, etc.

1.3.1 Lombok features

FeatureDetails
valLocal variables are declared as final
varMutable local variables
@Slf4JCreates an SLF4J logger
@CleanupWill call close() on the resource in the finally block
@GetterCreates getter methods for all properties
@SetterCreates setter for all non-final properties
@EqualsAndHashCode
  • Generates implementations of equals(Object other) and hashCode()
  • By default will use all non-static, non-transient properties
  • Can optionally exclude specific properties
@ToString
  • Generates String of class name, and each field separated by commas
  • Optional parameter to include field names
  • Optional parameter to include a call to the super toString method
@NoArgsConstructor
  • Generates no-args constructor
  • Will cause compiler error if there are final fields
  • Can optionally force, which will initialize final fields with 0/false/null var – mutable local variables
@RequiredArgsContructor
  • Generates a constructor for all fields that are final or marked @NonNull
  • The constructor will throw a NullPointerException if any @NonNull fields are null val – local variables are declared final
@AllArgsConstructor
  • Generates a constructor for all properties of the class
  • Any @NotNull properties will have null checks
@Data
  • Generates typical boilerplate code for POJOs
  • Combines – @Getter, @Setter, @ToString, @EqualsAndHashCode, @RequiredArgsConstructor
  • No constructor is generated if constructors have been explicitly declared
@Builder
  • Implements the Builder pattern for object creation
@Value
  • The immutable variant of @Data
  • All fields are made private and final by default

Let us go ahead with the tutorial implementation but before going any further I’m assuming that you’re aware of the Spring boot basics.

2. Using Apache Kafka with Spring Boot

2.1 Application Pre-requisite

To start with this tutorial, I am hoping that you have the Kafka up and running in your location environment. For easy setup, I have the Kafka up and running on the Docker environment. You can execute the below script using the docker-compose command to get the Kafka container running on Docker in minutes.

docker-compose.yml

services:
  kafka:
    container_name: kafka
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - '2181:2181'
version: '3'

If everything goes well the container would be started successfully as shown in Fig. 1. You can use the docker ps -a command to confirm that the Kafka container is started successfully.

spring boot kafka - kafka container on docker
Fig.1: Kafka container on Docker

2.2 Tools Used for Spring boot application and Project Structure

We are using Eclipse Kepler SR2, JDK 8, and Maven. In case you’re confused about where you should create the corresponding files or folder, let us review the project structure of the spring boot application.

spring boot kafka - Project structure
Fig. 2: Project structure

Let us start building the application!

3. Creating a Spring Boot application

Below are the steps involved in developing the application.

3.1 Maven Dependency

Here, we specify the dependency for the Spring boot (Web), Lombok, and Kafka. Maven will automatically resolve the other dependencies. The updated file will have the following code.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.1</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.springboot.kafka</groupId>
	<artifactId>SpringbootandKafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringbootandKafka</name>
	<description>Springboot and Kafka</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

3.2 Application yml file

Create a new yml file at the location: SpringbootandKafka/src/main/resources/ and add the following code to it. Here we will define the application and Kafka configuration (such as topic name, consumer, and producer).

application.yml

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: 'localhost:9092'
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: 'localhost:9092'
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
  topic:
    name: employees

3.3 Java Classes

Let us write all the java class(es) involved in this application. For a quick sight, I am not creating the Employee model class as it contains the basic properties with a constructor and the getter methods.

3.3.1 Implementation/Main class

Add the following code to the main class to bootstrap the application from the main method. Always remember, the entry point of the spring boot application is the class containing @SpringBootApplication annotation and the static main method.

SpringbootandKafkaApplication.java

package com.springboot.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@Slf4j
public class SpringbootandKafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbootandKafkaApplication.class, args);
		log.info("Springboot and kafka application is started successfully.");
	}
}

3.3.2 Producer class

Add the following code to the producer class which will be responsible for sending the message to the Kafka topic. We will use the KafkaTemplate to send the message to the Kafka topic.

Producer.java

package com.springboot.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class Producer {

    // reading the property from the application.yml file
    // if value is not specified it will pick up the default value as "employees"
    @Value("${kafka.topic.name:employees}")
    private String topic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessageToTopic(final String message) {
        log.info("Sending message to kafka = {}", message);
        kafkaTemplate.send(topic, message);
    }
}

3.3.3 Consumer class

Add the following code to the producer class which will be responsible for consuming the message from the Kafka topic. The consume(…) will be annotated with the @KafkaListener annotation to listen to a message from the given topic.

Consumer.java

package com.springboot.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class Consumer {

    @KafkaListener(topics = "#{'${kafka.topic.name:employees}'}", groupId = "group_id")
    public void consume(final String message) {
        log.info("Consuming message.");
        log.info(message);
    }
}

3.3.4 Controller class

Add the following code to the controller class which will be responsible to get input from the user and send it to the Kafka topic which will later be consumed by a consumer.

KafkaController.java

package com.springboot.kafka.controller;

import com.springboot.kafka.model.Employee;
import com.springboot.kafka.service.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/api/kafka")
@Slf4j
public class KafkaController {

    @Autowired
    private Producer producer;

    // URL - http://localhost:9000/api/kafka/send
    @ResponseStatus(value = HttpStatus.ACCEPTED)
    @PostMapping(value = "/send")
    public void send(@RequestBody final Employee employee) {
        log.info("Sending message to kafka topic");
        producer.sendMessageToTopic(employee.toString());
    }
}

4. Run the Application

To execute the application, right-click on the SpringbootandKafkaApplication.java class, Run As -> Java Application.

Fig. 3: Run the Application

5. Project Demo

When the application is started, you can use the exposed endpoint to send an HTTP POST to the controller. You are free to use postman or any other tool of your choice to make a post request to the endpoint.

Application endpoint

-- HTTP POST endpoint url --
http://localhost:9000/api/kafka/send

-- sample request body --
{
    "id": "c9897b40-deb5-408c-ab60-94dda8f04e5f",
    "firstname": "john",
    "lastname": "wick",
    "jobtitle": "hitman",
    "phone": "001-541-754-3010"
}

Once the request is successfully received by the controller method the employee object will be sent to the Kafka topic as a string message and will also be consumed by a consumer (who is subscribed to the same topic on which the message is sent out). We will verify the same with the help of logs.

Producer and Consumer logs

-- producer logs –
2020-12-15 10:58:23.077  INFO 16128 --- [nio-9000-exec-2] c.s.kafka.controller.KafkaController     : Sending message to kafka topic
2020-12-15 10:58:23.078  INFO 16128 --- [nio-9000-exec-2] com.springboot.kafka.service.Producer    : Sending message to kafka = Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679)

-- consumer logs –
2020-12-15 10:58:23.259  INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer    : Consuming message.
2020-12-15 10:58:23.259  INFO 16128 --- [ntainer#0-0-C-1] com.springboot.kafka.service.Consumer    : Employee(id=a1db7424-ae48-499b-bdaa-851a167ef849, firstname=Otha, lastname=Dooley, jobtitle=Product Group Facilitator, phone=483-411-1679)

That is all for this tutorial and I hope the article served you whatever you were looking for. Happy Learning and do not forget to share!

7. Summary

In this section, you learned:

  • Spring boot, Kafka, and Lombok introduction
  • Steps to implement Kafka in Spring Boot and run Kafka on Docker using docker-compose command

You can download the sample application as an Eclipse project in the Downloads section.

8. Download the Project

This was an example of implementing Kafka in a Spring Boot application.

Download
You can download the full source code of this example here: Using Apache Kafka with Spring Boot

Yatin

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button