Enterprise Java

How to Subscribe a Kafka Consumer to Multiple Topics

Apache Kafka is a leading distributed streaming platform that enables the building of real-time data pipelines and streaming applications. One of Kafka’s core components is its consumer API, which allows applications to subscribe to and process streams of records in real time. Handling data from multiple topics concurrently is a common requirement in many Kafka-based systems. This article will guide us through the process of subscribing a Kafka consumer to multiple topics.

1. Setting Up Kafka

There are several ways to run Apache Kafka. This article uses the manual installation. Docker also provides a convenient option for local development. Here’s how to get started:

1.1 Setting Up Kafka Manually

Download Apache Kafka from the official website and extract the downloaded archive. Next, we must start Zookeeper and Kafka servers manually using the command line.

1.1.1 Start the ZooKeeper Service

Kafka depends on Zookeeper for coordination, so we must start Zookeeper first. Navigate into the extracted Kafka directive and enter the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties

This command starts the Zookeeper server (default port is 2181).

1.1.2 Start Kafka Server

Once Zookeeper is running, we can start the Kafka server (default port is 9092). Open a new terminal window and execute the following command:

bin/kafka-server-start.sh config/server.properties

This creates a basic Kafka environment running and ready to use.

2. Subscribing to Multiple Topics Using Kafka Consumer API

The pom.xml below are dependencies for the Kafka client library and our JSON library (Jackson in this example).

pom.xml:

<dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.6.1</version>
    </dependency>
</dependencies>

2.1 Initialize Kafka Consumer

First, we need to initialize a Kafka consumer instance. This involves configuring properties such as bootstrap servers, group ID, key and value deserializers, etc. Here’s a Java example using the KafkaConsumer class from the Kafka client library:

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

2.2 Subscribe to Topics

Once the consumer is initialized, we can subscribe it to one or more topics using the subscribe() method:

consumer.subscribe(Arrays.asList("transactions", "financial-events")); // Use your topic names

2.3 Poll for Records

The consumer needs to poll Kafka for new records continuously. This is done using the poll() method in a loop like this:

while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    System.out.printf("Received message: key = %s, value = %s, topic = %s%n",
                            record.key(), message, record.topic());
                }
            }

The code snippet below is a full Java class with a main method to test the Kafka consumer configuration and publish one message on each of the topics. This class also utilize the Kafka producer library to send messages to the Kafka topics.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaConsumerExample {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args)  throws ExecutionException, InterruptedException {

        // Test Kafka consumer configuration and publish messages
        testKafkaConsumer();

    }
    
    private static void testKafkaConsumer() {
        
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("transactions", "financial-events")); // Use your topic names  
        
        // Create Kafka producer configuration
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Create Kafka producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        
        // Publish one message on each topic
        sendMessage(producer, "transactions", "100");
        sendMessage(producer, "financial-events", "200");
        
        // Consume messages
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    System.out.printf("Received message: key = %s, value = %s, topic = %s%n",
                            record.key(), message, record.topic());
                }
            }
        } finally {
            consumer.close();
            producer.close();
        }
        
    }
    
    private static void sendMessage(KafkaProducer<String, String> producer, String topic, String message) {
        // Send the message to Kafka topic
        producer.send(new ProducerRecord<>(topic, message));
        System.out.printf("Published message '%s' to topic '%s'%n", message, topic);
    }
}

Output is:

Fig 1.0 Output - subscribe consumers to multiple topics using Kafka Consumer API
Fig 1.0 Output – subscribe consumers to multiple topics using Kafka Consumer API

3. Subscribing to Multiple Topics Using Spring Kafka

Using Spring Kafka simplifies the process of working with Kafka consumers in Spring-based applications. Below are the steps to subscribe to multiple topics using Spring Kafka:

Dependencies in pom.xml

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </dependency>
 </dependencies>

3.1 Create a Model Class

Let’s define a model class Transaction to represent these messages. Each instance of this class will correspond to a message in a Kafka topic.

Transaction class:

public class Transaction {

    private String id;
    private String type;
    private double amount;

    public Transaction(String id, String type, double amount) {
        this.id = id;
        this.type = type;
        this.amount = amount;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    @Override
    public String toString() {
        return "Transaction{" + "id=" + id + ", type=" + type + ", amount=" + amount + '}';
    }
    
}

application.properties configuration

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: my-group
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

This configuration sets up Kafka consumers and producers to communicate with a Kafka broker running on localhost, with specific serialization and deserialization settings for keys and values. It also sets up JSON deserialization for consumers and serialization for producers.

For the Kafka Consumer:

  • spring.kafka.consumer.bootstrap-servers: Specifies the Kafka server to connect to, in this case, it’s localhost:9092.
  • spring.kafka.consumer.group-id: Defines the consumer group ID, which is set to my-group.
  • spring.kafka.consumer.value-deserializer: Specifies the deserializer class for values, which is org.springframework.kafka.support.serializer.JsonDeserializer.
  • spring.kafka.consumer.properties.spring.json.trusted.packages=*: Configures trusted packages for the JSON deserializer.

For the Kafka Producer:

  • spring.kafka.producer.bootstrap-servers: Specifies the Kafka server to connect to, in this case, it’s localhost:9092.
  • spring.kafka.producer.value-serializer: Specifies the serializer class for values, which is org.springframework.kafka.support.serializer.JsonSerializer.

3.2 Configure Spring Kafka

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.Map;
import java.util.HashMap;

@SpringBootApplication
@EnableKafka
public class KafkaconfigApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaconfigApplication.class, args);
    }

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer(Transaction.class));

    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

3.3 Create Kafka Consumer Listener

Here, we create a listener method in the application to consume messages from a Kafka topic.

@Service
public class KafkaConsumer {
    
    @KafkaListener(topics = {"transaction", "financial-events"}, groupId = "my-group")
    public void listen(Transaction message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        // Process the message
        System.out.printf("Received message: topic = %s, id = %s, type = %s, amount = %s%n", topic, message.getId(), message.getType(), message.getAmount());
        
        log.info("Event on topic={}, payload={}", topic, message);
    }
    
   
}
  • Annotated method @KafkaListener: This annotation marks the method as a listener for Kafka messages.
  • Specify Topic: We use the topics attribute of the @KafkaListener annotation to specify the Kafka topic(s) from which the method should consume messages.

3.4 Create Kafka Producer

Here, our Kafka Producer (REST controller) receives HTTP POST requests to create transactions, constructs Transaction objects from the request parameters, and delegates the saving of transactions to the ProducerService, which then sends the transactions to Kafka topics using a KafkaTemplate.

@RestController
@RequestMapping("/api")
public class KafkaProducer {

    @Autowired
    ProducerService producerService;

    @PostMapping(value = "/createTransaction")
    public void sendMessageTopic(
            @RequestParam("id") String id,
            @RequestParam("type") String type,
            @RequestParam("amount") Double amount) {

        Transaction message = new Transaction();
        message.setId(id);
        message.setType(type);
        message.setAmount(amount);

        this.producerService.saveTransaction(message);
    }

}

ProducerService.java class

@Service
public class ProducerService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void saveTransaction(Transaction message) {

        System.out.println("Transaction created " + message);
        this.kafkaTemplate.send("transaction", message);
        this.kafkaTemplate.send("financial-events", message);
    }
}

  • @Autowired private KafkaTemplate<String, Object> kafkaTemplate: Autowires an instance of KafkaTemplate to produce messages to Kafka topics.
  • saveTransaction method is responsible for saving a transaction. It receives a Transaction object and then sends it to two Kafka topics: "transaction" and "financial-events", using the injected KafkaTemplate.

Let’s post a new message to the API endpoint http://localhost:8080/api/createTransaction using curl with the following command:

curl -X POST -H "Content-Type: application/json" -d '{"id":"txn-123", "type":"purchase", "amount":300.0}' 'http://localhost:8080/api/createTransaction?id=txn-123&type=purchase&amount=300.0'

Output on a Terminal console log is:

Fig 2.

4. Subscribing to Multiple Topics Using Kafka CLI

Kafka also provides a command-line interface (CLI) for interacting with topics and consuming messages. We use the Kafka producer tool kafka-console-producer.sh to publish test messages by navigating to the Kafka installation directory and running the kafka-console-producer.sh script like this:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic transactions
>{"id": "txn-123", "type": "purchase", "amount": 100.50}

This command starts the kafka-console-producer.sh tool to publish messages to the transactions topic. We can then type messages directly into the terminal window and press Enter to publish them.

4.1 Subscribe to Topics

Below is an example of subscribing to multiple topics using Kafka CLI:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions,financial-events --from-beginning

In this command:

  • --bootstrap-server: Specifies the Kafka broker(s) to connect to.
  • --topic: Specifies the name of the topic(s) to subscribe to, separated by commas.
  • --from-beginning: Starts consuming from the beginning of the topic(s).

5. Conclusion

This article has explored different approaches for configuring a Kafka consumer to subscribe to multiple topics. Whether through the Kafka Consumer API, Spring Kafka, or Kafka CLI, these methods offer flexibility and scalability for efficiently managing Kafka topics in diverse applications.

6. Download the Source Code

This was an example of How to Subscribe a Kafka Consumer to Multiple Topics.

Download
You can download the full source code of this example here: kafka subscribe consumer multiple topics

Omozegie Aziegbe

Omos holds a Master degree in Information Engineering with Network Management from the Robert Gordon University, Aberdeen. Omos is currently a freelance web/application developer who is currently focused on developing Java enterprise applications with the Jakarta EE framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button