How to Subscribe a Kafka Consumer to Multiple Topics
Apache Kafka is a leading distributed streaming platform that enables the building of real-time data pipelines and streaming applications. One of Kafka’s core components is its consumer API, which allows applications to subscribe to and process streams of records in real time. Handling data from multiple topics concurrently is a common requirement in many Kafka-based systems. This article will guide us through the process of subscribing a Kafka consumer to multiple topics.
1. Setting Up Kafka
There are several ways to run Apache Kafka. This article uses the manual installation. Docker also provides a convenient option for local development. Here’s how to get started:
1.1 Setting Up Kafka Manually
Download Apache Kafka from the official website and extract the downloaded archive. Next, we must start Zookeeper and Kafka servers manually using the command line.
1.1.1 Start the ZooKeeper Service
Kafka depends on Zookeeper for coordination, so we must start Zookeeper first. Navigate into the extracted Kafka directive and enter the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
This command starts the Zookeeper server (default port is 2181).
1.1.2 Start Kafka Server
Once Zookeeper is running, we can start the Kafka server (default port is 9092). Open a new terminal window and execute the following command:
bin/kafka-server-start.sh config/server.properties
This creates a basic Kafka environment running and ready to use.
2. Subscribing to Multiple Topics Using Kafka Consumer API
The pom.xml
below are dependencies for the Kafka client library and our JSON library (Jackson in this example).
pom.xml:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency> </dependencies>
2.1 Initialize Kafka Consumer
First, we need to initialize a Kafka consumer instance. This involves configuring properties such as bootstrap servers
, group ID
, key
and value
deserializers, etc. Here’s a Java example using the KafkaConsumer
class from the Kafka client library:
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
2.2 Subscribe to Topics
Once the consumer is initialized, we can subscribe it to one or more topics using the subscribe()
method:
consumer.subscribe(Arrays.asList("transactions", "financial-events")); // Use your topic names
2.3 Poll for Records
The consumer needs to poll Kafka for new records continuously. This is done using the poll()
method in a loop like this:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String message = record.value(); System.out.printf("Received message: key = %s, value = %s, topic = %s%n", record.key(), message, record.topic()); } }
The code snippet below is a full Java class with a main
method to test the Kafka consumer configuration and publish one message on each of the topics. This class also utilize the Kafka producer library to send messages to the Kafka topics.
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; public class KafkaConsumerExample { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) throws ExecutionException, InterruptedException { // Test Kafka consumer configuration and publish messages testKafkaConsumer(); } private static void testKafkaConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("transactions", "financial-events")); // Use your topic names // Create Kafka producer configuration Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Create Kafka producer KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Publish one message on each topic sendMessage(producer, "transactions", "100"); sendMessage(producer, "financial-events", "200"); // Consume messages try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String message = record.value(); System.out.printf("Received message: key = %s, value = %s, topic = %s%n", record.key(), message, record.topic()); } } } finally { consumer.close(); producer.close(); } } private static void sendMessage(KafkaProducer<String, String> producer, String topic, String message) { // Send the message to Kafka topic producer.send(new ProducerRecord<>(topic, message)); System.out.printf("Published message '%s' to topic '%s'%n", message, topic); } }
Output is:
3. Subscribing to Multiple Topics Using Spring Kafka
Using Spring Kafka simplifies the process of working with Kafka consumers in Spring-based applications. Below are the steps to subscribe to multiple topics using Spring Kafka:
Dependencies in pom.xml
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>
3.1 Create a Model Class
Let’s define a model class Transaction
to represent these messages. Each instance of this class will correspond to a message in a Kafka topic.
Transaction class:
public class Transaction { private String id; private String type; private double amount; public Transaction(String id, String type, double amount) { this.id = id; this.type = type; this.amount = amount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getType() { return type; } public void setType(String type) { this.type = type; } public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } @Override public String toString() { return "Transaction{" + "id=" + id + ", type=" + type + ", amount=" + amount + '}'; } }
application.properties configuration
spring.kafka.consumer.bootstrap-servers: localhost:9092 spring.kafka.consumer.group-id: my-group spring.kafka.consumer.auto-offset-reset: earliest spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.bootstrap-servers: localhost:9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
This configuration sets up Kafka consumers and producers to communicate with a Kafka broker running on localhost, with specific serialization and deserialization settings for keys
and values
. It also sets up JSON deserialization for consumers and serialization for producers.
For the Kafka Consumer:
spring.kafka.consumer.bootstrap-servers
: Specifies the Kafka server to connect to, in this case, it’slocalhost:9092
.spring.kafka.consumer.group-id
: Defines the consumer group ID, which is set tomy-group
.spring.kafka.consumer.value-deserializer
: Specifies the deserializer class for values, which isorg.springframework.kafka.support.serializer.JsonDeserializer
.spring.kafka.consumer.properties.spring.json.trusted.packages=*
: Configures trusted packages for the JSON deserializer.
For the Kafka Producer:
spring.kafka.producer.bootstrap-servers
: Specifies the Kafka server to connect to, in this case, it’slocalhost:9092
.spring.kafka.producer.value-serializer
: Specifies the serializer class for values, which isorg.springframework.kafka.support.serializer.JsonSerializer
.
3.2 Configure Spring Kafka
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Map; import java.util.HashMap; @SpringBootApplication @EnableKafka public class KafkaconfigApplication { public static void main(String[] args) { SpringApplication.run(KafkaconfigApplication.class, args); } @Value("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers; @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer(Transaction.class)); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }
3.3 Create Kafka Consumer Listener
Here, we create a listener method in the application to consume messages from a Kafka topic.
@Service public class KafkaConsumer { @KafkaListener(topics = {"transaction", "financial-events"}, groupId = "my-group") public void listen(Transaction message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { // Process the message System.out.printf("Received message: topic = %s, id = %s, type = %s, amount = %s%n", topic, message.getId(), message.getType(), message.getAmount()); log.info("Event on topic={}, payload={}", topic, message); } }
- Annotated method
@KafkaListener
: This annotation marks the method as a listener for Kafka messages. - Specify Topic: We use the
topics
attribute of the@KafkaListener
annotation to specify the Kafka topic(s) from which the method should consume messages.
3.4 Create Kafka Producer
Here, our Kafka Producer (REST controller) receives HTTP POST requests to create transactions, constructs Transaction
objects from the request parameters, and delegates the saving of transactions to the ProducerService
, which then sends the transactions to Kafka topics using a KafkaTemplate
.
@RestController @RequestMapping("/api") public class KafkaProducer { @Autowired ProducerService producerService; @PostMapping(value = "/createTransaction") public void sendMessageTopic( @RequestParam("id") String id, @RequestParam("type") String type, @RequestParam("amount") Double amount) { Transaction message = new Transaction(); message.setId(id); message.setType(type); message.setAmount(amount); this.producerService.saveTransaction(message); } }
ProducerService.java class
@Service public class ProducerService { @Autowired private KafkaTemplate kafkaTemplate; public void saveTransaction(Transaction message) { System.out.println("Transaction created " + message); this.kafkaTemplate.send("transaction", message); this.kafkaTemplate.send("financial-events", message); } }
@Autowired private KafkaTemplate<String, Object> kafkaTemplate
: Autowires an instance ofKafkaTemplate
to produce messages to Kafka topics.saveTransaction
method is responsible for saving a transaction. It receives aTransaction
object and then sends it to two Kafka topics:"transaction"
and"financial-events"
, using the injectedKafkaTemplate
.
Let’s post a new message to the API endpoint http://localhost:8080/api/createTransaction
using curl
with the following command:
curl -X POST -H "Content-Type: application/json" -d '{"id":"txn-123", "type":"purchase", "amount":300.0}' 'http://localhost:8080/api/createTransaction?id=txn-123&type=purchase&amount=300.0'
Output on a Terminal console log is:
4. Subscribing to Multiple Topics Using Kafka CLI
Kafka also provides a command-line interface (CLI) for interacting with topics and consuming messages. We use the Kafka producer tool kafka-console-producer.sh
to publish test messages by navigating to the Kafka installation directory and running the kafka-console-producer.sh
script like this:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic transactions >{"id": "txn-123", "type": "purchase", "amount": 100.50}
This command starts the kafka-console-producer.sh
tool to publish messages to the transactions
topic. We can then type messages directly into the terminal window and press Enter to publish them.
4.1 Subscribe to Topics
Below is an example of subscribing to multiple topics using Kafka CLI:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions,financial-events --from-beginning
In this command:
--bootstrap-server
: Specifies the Kafka broker(s) to connect to.--topic
: Specifies the name of the topic(s) to subscribe to, separated by commas.--from-beginning
: Starts consuming from the beginning of the topic(s).
5. Conclusion
This article has explored different approaches for configuring a Kafka consumer to subscribe to multiple topics. Whether through the Kafka Consumer API, Spring Kafka, or Kafka CLI, these methods offer flexibility and scalability for efficiently managing Kafka topics in diverse applications.
6. Download the Source Code
This was an example of How to Subscribe a Kafka Consumer to Multiple Topics.
You can download the full source code of this example here: kafka subscribe consumer multiple topics