spring

Apache Kafka using Spring Boot

Hello. In this tutorial, we will provide a brief introduction to using Apache Kafka in Spring Boot. We will explore the integration of Apache Kafka, a distributed streaming platform, with Spring Boot, a popular Java framework for building robust and scalable applications. Apache Kafka provides a reliable, scalable, and fault-tolerant messaging system that enables the exchange of data streams between multiple applications and microservices. By incorporating Kafka into Spring Boot applications, developers can leverage its powerful features such as real-time data processing, fault tolerance, and high throughput. We will dive into the key concepts and functionalities of Apache Kafka in the context of Spring Boot, and demonstrate how to produce and consume messages using Kafka topics. Let’s get started!

1. Introduction

  • Apache Kafka is a distributed streaming platform.
  • It is designed to handle real-time, high-throughput data feeds.
  • Kafka provides a publish-subscribe model for data streams.
  • It offers fault-tolerant storage and replication of data.
  • Kafka is horizontally scalable and allows for distributed processing.
  • It is widely used for building real-time data pipelines and streaming applications.
  • Kafka integrates well with other Apache frameworks and popular data processing tools.

1.1 Kafka Capabilities

Kafka is a distributed streaming platform that is widely used for building real-time data pipelines and streaming applications. It is designed to handle high-throughput, fault-tolerant, and scalable data streams.

1.1.1 Scalability

Kafka is horizontally scalable, allowing you to handle large volumes of data and high-traffic loads. It achieves scalability by distributing data across multiple nodes in a cluster, enabling you to add more nodes as your needs grow.

1.1.2 Durability

Kafka provides persistent storage for streams of records. Messages sent to Kafka topics are durably stored on disk and replicated across multiple servers to ensure fault tolerance. This ensures that data is not lost even in the event of node failures.

1.1.3 Reliability

Kafka guarantees message delivery with at least-once semantics. This means that once a message is published on a topic, it will be delivered to the consumers at least once, even in the presence of failures or network issues.

1.1.4 Real-time streaming

Kafka enables real-time processing of streaming data. Producers can publish messages to Kafka topics in real time, and consumers can subscribe to these topics and process the messages as they arrive, allowing for low-latency data processing.

1.1.5 High throughput

Kafka is capable of handling very high message throughput. It can handle millions of messages per second, making it suitable for use cases that require processing large volumes of data in real-time.

1.1.6 Data integration

Kafka acts as a central hub for integrating various data sources and systems. It provides connectors and APIs that allow you to easily ingest data from different sources, such as databases, messaging systems, log files, and more, into Kafka topics.

1.1.7 Streaming data processing

Kafka integrates well with popular stream processing frameworks like Apache Spark, Apache Flink, and Apache Samza. These frameworks can consume data from Kafka topics, perform advanced processing operations (such as filtering, aggregating, and transforming), and produce derived streams of data.

1.1.8 Message Retention

Kafka allows you to configure the retention period for messages in topics. This means that messages can be retained for a specified period, even after they have been consumed by consumers. This enables the replayability of data and supports use cases where historical data needs to be accessed.

1.1.9 Exactly-once processing

Kafka provides exactly-once processing semantics when used with supporting stream processing frameworks. This ensures that data processing is performed exactly once, even in the face of failures and retries, while maintaining data integrity.

1.1.10 Security

Kafka supports authentication and authorization mechanisms to secure the cluster. It provides SSL/TLS encryption for secure communication between clients and brokers and supports integration with external authentication systems like LDAP or Kerberos.

These are some of the key capabilities of Kafka, which make it a powerful tool for building scalable, fault-tolerant, and real-time data processing systems.

1.2 Error Handling and Recovery in Apache Kafka

Error handling and recovery in Apache Kafka are crucial aspects of building robust and reliable data processing pipelines. Kafka provides several mechanisms for handling errors and recovering from failures. Here are some key components and techniques for error handling and recovery in Kafka:

1.2.1 Retries and Backoff

Kafka clients can be configured to automatically retry failed operations, such as producing or consuming messages. Retries can help recover from transient failures, network issues, or temporary unavailability of resources. Backoff strategies can be employed to introduce delays between retries, allowing the system to stabilize before attempting again.

1.2.2 Error Codes

Kafka provides error codes to indicate specific types of failures. Error codes can be used by clients to identify the nature of the error and take appropriate action. For example, a client can handle a “leader not available” error differently than a “message too large” error.

1.2.3 Dead Letter Queues (DLQ)

DLQs are special Kafka topics where problematic messages are redirected when they cannot be processed successfully. By sending failed messages to a DLQ, they can be stored separately for later inspection and analysis. DLQs allow the decoupling of error handling from the main processing logic, enabling manual or automated recovery processes.

1.2.4 Monitoring and Alerting

Setting up monitoring and alerting systems for Kafka clusters and client applications is crucial for proactive error handling. Monitoring can provide insights into the health and performance of Kafka components, enabling early detection of issues. Alerts can notify administrators or operators about critical failures, high error rates, or other abnormal conditions, allowing them to take corrective actions promptly.

1.2.5 Transactional Support

Kafka supports transactions, which provide atomicity and isolation guarantees for producing and consuming messages. Transactions allow multiple operations to be grouped as a single unit of work, ensuring that either all operations succeed or none of them take effect. In case of failures, transactions can be rolled back to maintain data consistency.

1.2.6 Idempotent Producers

Kafka producers can be configured as idempotent, ensuring that duplicate messages are not introduced even if retries occur. Idempotent producers use message deduplication and sequence numbers to guarantee that messages are either successfully delivered once or not at all, preventing duplicate processing.

1.2.7 Monitoring and Recovery Tools

Various third-party tools and frameworks exist for monitoring and managing Kafka clusters, such as Confluent Control Center and Apache Kafka Manager. These tools provide visual dashboards, alerting capabilities, and automated recovery features, making it easier to detect and resolve errors.

It is important to design error handling and recovery mechanisms specific to your use case, considering factors like fault tolerance requirements, processing semantics, and data consistency. Proper monitoring, observability, and proactive error management practices are crucial for building robust and reliable Kafka-based systems.

1.3 Advantages and Disadvantages of Apache Kafka

AdvantagesDisadvantages
High-throughput and low-latency data processing.Initial setup and configuration complexity.
Scalable and fault-tolerant architecture.The steeper learning curve for beginners.
Efficient handling of real-time data streams.Requires additional infrastructure resources.
Reliable data storage and replication.No built-in security features.
Seamless integration with various data processing tools.Limited built-in monitoring and management capabilities.

2. Setting up Apache Kafka on Docker

Docker is an open-source platform that enables containerization, allowing you to package applications and their dependencies into standardized units called containers. These containers are lightweight, isolated, and portable, providing consistent environments for running applications across different systems. Docker simplifies software deployment by eliminating compatibility issues and dependency conflicts. It promotes scalability, efficient resource utilization, and faster application development and deployment. With Docker, you can easily build, share, and deploy applications in a consistent and reproducible manner, making it a popular choice for modern software development and deployment workflows. If someone needs to go through the Docker installation, please watch this video.

Using Docker Compose simplifies the process by defining the services, their dependencies, and network configuration in a single file. It allows for easier management and scalability of the environment. Make sure you have Docker and Docker Compose installed on your system before proceeding with these steps. To set up Apache Kafka on Docker using Docker Compose, follow these steps.

2.1 Creating Docker Compose file

Create a file called docker-compose.yml and open it for editing. Add the following content to the file and save it once done.

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

2.2 Running the Kafka containers

Open a terminal or command prompt in the same directory as the docker-compose.yml file. Start the Kafka containers by running the following command:

Start containers

docker-compose up -d

This command will start the ZooKeeper and Kafka containers in detached mode, running in the background. To stop and remove the containers, as well as the network created, use the following command:

Stop containers

docker-compose down

3. Apache Kafka in Spring Boot

Let us dive into some practice stuff. To get started with Kafka using Spring Boot, you can follow these steps.

3.1 Creating a Spring Boot Project

Set up a new Spring Boot project or use an existing one. Include the necessary dependencies in your project’s pom.xml file.

pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2 Configure Kafka properties

In your project’s application.properties file, configure the Kafka connection and topic properties.

PropertyValueDescription
Kafka connection detailsspring.kafka.bootstrap-servers=localhost:9092This property specifies the bootstrap servers for connecting to Kafka. In this case, it is set to “localhost:9092”. The bootstrap servers are responsible for initial connection establishment and discovering other brokers in the Kafka cluster.
Kafka consumer group IDspring.kafka.consumer.group-id=my-groupThis property defines the consumer group ID for a Kafka consumer. Consumer groups allow parallel processing of messages by dividing the workload among multiple consumers. In this case, the consumer group ID is set to “my-group”.
Kafka topic detailsspring.kafka.topic-name=my-topicThis property specifies the name of the Kafka topic to which the application will publish or consume messages. In this case, the topic name is set to “my-topic”.
Kafka topic partitionsspring.kafka.topic-partitions=3This property defines the number of partitions for a Kafka topic. Partitions enable parallel processing and scalability within a topic. In this case, the topic is configured to have 3 partitions.
Kafka topic replication factorspring.kafka.topic-replication-factor=1This property specifies the replication factor for a Kafka topic. Replication ensures fault tolerance and data availability by creating copies of each partition on multiple brokers. In this case, the topic has a replication factor of 1, meaning each partition has only one replica.

application.properties

# Kafka connection details
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group

# Kafka topic details
spring.kafka.topic-name=my-topic
spring.kafka.topic-partitions=3
spring.kafka.topic-replication-factor=1

3.3 Creating Kafka topic

To create Kafka topics programmatically in a Spring Boot application on startup, you can utilize the KafkaAdmin bean provided by Spring Kafka.

  • The KafkaTopicConfig class creates a KafkaAdmin bean for configuring the KafkaAdminClient, and a NewTopic bean using the provided topic name, number of partitions, and replication factor.
  • During application startup, Spring Boot will automatically initialize the KafkaAdmin bean and invoke the myTopic() method to create the specified topic.
  • In your producer and consumer code, you can refer to the topic name configured in application.properties using the @Value annotation.
  • You can then use the topicName variable to produce or consume messages from the dynamically created topic.

KafkaTopicConfig.java

package org.jcg.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    @Value("${spring.kafka.topic-partitions}")
    private int numPartitions;

    @Value("${spring.kafka.topic-replication-factor}")
    private short replicationFactor;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(Map.of(
                AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"
        ));
    }

    @Bean
    public NewTopic myTopic() {
        return TopicBuilder.name(topicName)
                .partitions(numPartitions)
                .replicas(replicationFactor)
                .build();
    }
}

With this setup, the Kafka topic will be created automatically when your Spring Boot application starts. Make sure you have a running Kafka cluster reachable at the configured bootstrap servers.

3.4 Producing Messages

In this example, the KafkaProducer class is annotated with @Component to be automatically managed by Spring. It injects the KafkaTemplate bean for interacting with Kafka and uses the @Value annotation to retrieve the topic name from application.properties.

KafkaProducer.java

package org.jcg.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    public void sendMessage(String message) {
        kafkaTemplate.send(topicName, message);
        System.out.println("Message sent: " + message);
    }
}

To send a message using the Kafka producer, you can call the sendMessage method, passing the message content as a parameter. The producer will use the configured topic name to send the message to the Kafka topic.

Remember to configure the Kafka connection properties and topic name correctly in your application.properties file for this example to work properly.

3.4.1 Handling Error and Recovery

To handle error handling and recovery with CompletableFuture, you can modify the sendMessage() method to return a CompletableFuture that represents the asynchronous result of the message-sending operation. You can then use the handle() method of CompletableFuture to handle any exceptions that occur and perform the necessary recovery actions. Here’s the updated code with error handling and recovery using CompletableFuture:

KafkaProducerV1.java

package org.jcg.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class KafkaProducerV1 {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    private static final int MAX_RETRIES = 3;

    public CompletableFuture<Void> sendMessage(String message) {
        return sendWithRetryAndRecovery(message, 1);
    }

    private CompletableFuture<Void> sendWithRetryAndRecovery(String message, int retryCount) {
        return CompletableFuture.runAsync(() -> {
            kafkaTemplate.send(topicName, message);
            System.out.println("Message sent: " + message);
        }).handle((result, ex) -> {
            if (ex != null) {
                System.err.println("Error sending message: " + ex.getMessage());
                if (retryCount < MAX_RETRIES) {
                    System.out.println("Retrying message sending. Retry count: " + retryCount);
                    return sendWithRetryAndRecovery(message, retryCount + 1);
                } else {
                    System.err.println("Max retry attempts reached. Failed to send message.");
                    performRecoveryActions();
                }
            }
            return null; // Return null as the result since CompletableFuture<Void> doesn't expect a result
        });
    }

    private void performRecoveryActions() {
        // Implement your recovery logic here
        System.out.println("Performing recovery actions...");
        // Perform additional actions such as logging, fallback mechanisms, or notifying administrators
    }
}

In the updated code, a new method called performRecoveryActions() is added to encapsulate the recovery logic. This method is invoked when the maximum retry count is reached, indicating a failure to send the message. You can customize the performRecoveryActions() method according to your specific requirements, such as logging the failure, executing fallback mechanisms, or notifying administrators about the issue.

With this implementation, the sendMessage() method returns a CompletableFuture representing the overall result of the message-sending operation, including both retries and recovery. You can handle the completion of this CompletableFuture in the calling code to perform further actions or error handling as needed. Feel free to adjust the MAX_RETRIES constant and add more sophisticated recovery actions based on your use case.

3.5 Consuming Messages

In this example, the KafkaConsumer class is annotated with @Component to be automatically managed by Spring. It uses the @Value annotation to retrieve the topic name from application.properties.

KafkaConsumer.java

package org.jcg.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    @KafkaListener(topics = "${spring.kafka.topic-name}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        // Perform further processing of the received message
    }
}

The receiveMessage method is annotated with @KafkaListener, specifying the topic name using the topic attribute in the annotation. When a new message is received from the Kafka topic, the receiveMessage method is invoked, and the message content is printed to the console. You can perform further processing of the received message as per your requirements.

Ensure that the Kafka connection properties and topic name are correctly configured in your application.properties file for this example to work correctly.

3.5.1 Sending Acknowledgment

To send an acknowledgment back to the Kafka topic once the message is accepted by the consumer, you can modify the code to use a KafkaTemplate to send the acknowledgment message. Here’s the updated code:

KafkaConsumerV1.java

package org.jcg.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerV1 {

    @Autowired
    private KafkaTemplate<string, string=""> kafkaTemplate;

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    @KafkaListener(topics = "${spring.kafka.topic-name}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        // Perform further processing of the received message

        // Send acknowledgment back to Kafka topic
        sendAcknowledgment(message);
    }

    private void sendAcknowledgment(String message) {
        kafkaTemplate.send(topicName, "Acknowledgement: " + message);
        System.out.println("Acknowledgement sent for message: " + message);
    }
}
</string,>

In this updated code, a KafkaTemplate is autowired into the KafkaConsumer class, allowing us to send messages back to the Kafka topic. The receiveMessage() method is modified to call the sendAcknowledgement() method after processing the received message.

The sendAcknowledgement() method uses the kafkaTemplate to send the acknowledgment message to the same Kafka topic. It appends the “Acknowledgement: ” prefix to the original message to indicate it as an acknowledgment. You can customize the acknowledgment message format according to your requirements.

Now, when a message is received and processed by the consumer, an acknowledgment message will be sent back to the Kafka topic using the KafkaTemplate.

3.6 Create REST API to Send message

Create a controller package, within the controller package, create KafkaProducerController with the following content to it:

KafkaProducerController.java

package org.jcg.controller;

import org.jcg.producer.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/publish")
    public ResponseEntity<String> publish(@RequestParam("message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok("Message sent to Kafka topic");
    }
}

3.7 Create Main class

In this main class, the @SpringBootApplication annotation is used to indicate that this is the main entry point for a Spring Boot application. It enables auto-configuration and component scanning.

The main() method starts the Spring Boot application by calling SpringApplication.run() and passing the main class (Application.class) along with any command-line arguments (args).

By running this main class, the Spring Boot application will start, and the controller class (KafkaProducerController) will be available to handle requests on the specified endpoints.

Application.java

package org.jcg;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

4. Output

Here’s a sample output to demonstrate how the KafkaProducer and KafkaConsumer classes work together. Open a browser and hit the below link to call a REST API:

http://localhost:8080/api/v1/kafka/publish

The KafkaProducer class sends a message “Hello, Kafka!” to the Kafka topic.

Sample Output – KafkaProducer

Message sent: Hello, Kafka!

The KafkaConsumer class listens to the same Kafka topic and receives the message.

Sample Output – KafkaConsumer

Received message: Hello, Kafka!

In the sample output, you can see that the producer successfully sends the message, and the consumer receives and prints it to the console.

5. Conclusion

In conclusion, Apache Kafka is a powerful messaging system that provides a scalable, fault-tolerant, and high-performance solution for handling real-time data streams. When combined with Spring Boot, it offers seamless integration and enables developers to build robust and efficient distributed applications.

Using Apache Kafka in a Spring Boot application brings several benefits. It provides reliable message queuing and delivery guarantees, ensuring that messages are processed in a distributed and fault-tolerant manner. Kafka’s distributed architecture allows for high throughput and low-latency data processing, making it suitable for use cases that require real-time data streaming and event-driven architectures.

With Spring Boot, developers can leverage the simplicity and productivity of the Spring framework, along with the extensive ecosystem of Spring projects, to easily integrate Kafka into their applications. Spring Boot provides convenient auto-configuration and pre-defined templates, simplifying the setup and configuration of Kafka producers and consumers.

By utilizing Spring Boot’s annotation-based programming model and dependency injection capabilities, developers can write clean and concise code for interacting with Kafka topics. Spring Boot’s integration with Apache Kafka enables seamless error handling, message serialization/deserialization, and concurrency management, making it easier to focus on business logic rather than low-level Kafka details.

Furthermore, Spring Boot’s support for externalized configuration allows for easy customization of Kafka-specific properties, such as brokers, topics, serialization/deserialization formats, and consumer group settings. This flexibility enables developers to adapt their applications to different Kafka environments without requiring code changes.

That is all for this tutorial and I hope the article served you whatever you were looking for. Happy Learning and do not forget to share! You can download the source code from the Downloads section.

6. Download the Project

In summary, developers can integrate Apache Kafka with Spring Boot to obtain a scalable and fault-tolerant solution for handling real-time data streams. Integrating Kafka and Spring Boot offers numerous benefits, including providing developers with reliable message queuing, achieving high throughput, and enabling low-latency data processing. The simplicity and productivity of Spring Boot, combined with its integration with Kafka, allow developers to easily configure and interact with Kafka topics. Developers can focus on their business logic while enjoying the advantages of seamless error handling, efficient message serialization/deserialization, and effective concurrency management. Additionally, Spring Boot’s support for externalized configuration enables developers to easily customize Kafka properties, ensuring adaptability to different Kafka environments without the need for code changes. In conclusion, the integration of Apache Kafka and Spring Boot empowers developers to construct robust and efficient distributed applications that effectively handle real-time data streams.

This was a tutorial on using Apache Kafka in Spring Boot.

Download
You can download the full source code of this example here: Apache Kafka using Spring Boot

Yatin

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button