Spring Boot and Apache Pulsar Integration
Apache Pulsar is a powerful distributed messaging system designed for high-throughput and low-latency data streaming. It’s used by organizations worldwide to build real-time, event-driven applications and microservices. Spring Boot, on the other hand, is a popular framework for building Java applications with minimal configuration. In this article, we’ll explore how to get started with Apache Pulsar and Spring Boot, leveraging the power of these two technologies for building scalable, real-time applications.
1. Intro
Apache Pulsar is an open-source, cloud-native messaging system that combines the best of both message queue and publish-subscribe models. It is designed for durability, scalability, and high performance, making it an excellent choice for building event-driven, real-time applications. Spring Boot, a project under the Spring Framework, simplifies the development of Java applications by providing a set of conventions and features for rapid development.
2. Maven Dependency
To get started with Apache Pulsar and Spring Boot, you need to add the Pulsar Spring Boot Starter dependency to your project. Open your pom.xml
file and add the following dependency:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-pulsar</artifactId> <version>3.2.0-SNAPSHOT</version> </dependency> </dependencies>
When using Version 0.2.x
the above coordinates change as follows:
<dependencies> <dependency> <groupId>org.springframework.pulsar</groupId> <artifactId>spring-pulsar-spring-boot-starter</artifactId> <version>0.2.0</version> </dependency> </dependencies>
This starter simplifies the integration of Pulsar with your Spring Boot application.
3. Pulsar cluster
For local development and testing, we run Pulsar in standalone mode within a Docker container. We will create a docker-compose file containing all the instructions to run the Pulsar instance in standalone mode and Pulsar Manager.
version: '3.5' services: pulsar: image: "apachepulsar/pulsar:2.10.1" command: bin/pulsar standalone environment: PULSAR_MEM: "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" volumes: - ./pulsar/data:/pulsar/data ports: - "6650:6650" - "8080:8080" restart: unless-stopped networks: - pulsar_network pulsar-manager: image: "apachepulsar/pulsar-manager:v0.2.0" ports: - "9527:9527" - "7750:7750" depends_on: - pulsar environment: SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties networks: - pulsar_network networks: pulsar_network: name: pulsar_network driver: bridge
This is a Docker Compose file that defines two services: pulsar
and pulsar-manager
.
The pulsar
service is using the apachepulsar/pulsar:2.10.1
Docker image. It runs a Pulsar standalone instance with the command bin/pulsar standalone
. It sets the environment variable PULSAR_MEM
to configure the memory settings. It also mounts the ./pulsar/data
directory as a volume inside the container. It exposes ports 6650
and 8080
for Pulsar’s messaging and administrative interfaces respectively. The service will automatically restart unless explicitly stopped. It is connected to a network named pulsar_network
.
The pulsar-manager
service is using the apachepulsar/pulsar-manager:v0.2.0
Docker image. It exposes ports 9527
and 7750
for Pulsar Manager’s UI and API respectively. It depends on the pulsar
service, meaning that the pulsar
service will be started before the pulsar-manager
service. It sets the environment variable SPRING_CONFIGURATION_FILE
to specify the location of the application properties file. It is also connected to the pulsar_network
.
The file also defines a network called pulsar_network
with the bridge
driver. This network is used to connect the pulsar
and pulsar-manager
services together.
Start the local standalone cluster:
docker-compose up -d
4. Application Code
The following listing shows the Spring Boot application case for the example:
package org.example; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.core.PulsarTemplate; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } @Bean ApplicationRunner runner(PulsarTemplate<User> pulsarTemplate) { return (args) -> pulsarTemplate.send("hello-user-topic", new User("Hello User", 1)); } @Bean ApplicationRunner runner1(PulsarTemplate<String> pulsarTemplate) { return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!"); } @PulsarListener(schemaType = SchemaType.JSON) void listen(User message) { System.out.println("Message Received: " + message); } @PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar-topic") public void listen(String message) { System.out.println("Message Received: " + message); } @PulsarListener(subscriptionName = "my-subscription", topics = "hello-pulsar-topic") public void listen(org.apache.pulsar.client.api.Message<String> message) { System.out.println(message.getValue()); } @Bean ApplicationRunner runner2(PulsarTemplate<String> pulsarTemplate) { return (args) -> pulsarTemplate.send("topic-with-dlp", "Hello Pulsar World!"); } }
Let us quickly go through the higher-level details of this application. Later in the article we see these components in much more detail.
In the preceding sample, we heavily rely on Spring Boot auto-configuration. Spring Boot auto-configures several components for our application. It automatically provides a PulsarClient
, which is used by both the producer and the consumer, for the application.
Spring Boot also auto-configures PulsarTemplate
, which we inject into the application and start sending records to a Pulsar topic. The application sends messages to a topic named hello-pulsar
. Note that the application does not specify any schema information, because Spring for Apache Pulsar library automatically infers the schema type from the type of the data that you send.
We use the PulsarListener annotation to consume from the hello-pulsar topic where we publish the data. PulsarListener is a convenience annotation that wraps the message listener container infrastructure in Spring for Apache Pulsar. Behind the scenes, it creates a message listener container to create and manage the Pulsar consumer. As with a regular Pulsar consumer, the default subscription type when using PulsarListener is the Exclusive mode. As records are published into the hello-pulsar
topic, the Pulsarlistener
consumes them and prints them on the console. The framework also infers the schema type used from the data type that the PulsarListner
method uses as the payload — String
, in this case.
Also, we have created a record named User, for demonstration purposes:
package org.example; public record User(String name, int id) { }
5. PulsarClient
When you use the Pulsar Spring Boot Starter, you get the PulsarClient
auto-configured.
By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650
. This can be adjusted by setting the spring.pulsar.client.service-url
property to a different value.
You can further configure the client by specifying any of the spring.pulsar.client.*
application properties inside application.yml
file.
The application.yml
file is located under the resources
file, inside our source code. There we can specify all kinds of settings for our application.
If you are not using the starter, you will need to configure and register the PulsarClient
yourself. There is a DefaultPulsarClientFactory
that accepts a builder customizer that can be used to help with this.
6. Message Production
6.1 Pulsar Template
On the Pulsar producer side, Spring Boot auto-configuration provides a PulsarTemplate
for publishing records. The template implements an interface called PulsarOperations
and provides methods to publish records through its contract.
There are two categories of these send API methods: send
and sendAsync
. The send
methods block calls by using the synchronous sending capabilities on the Pulsar producer. They return the MessageId
of the message that was published once the message is persisted on the broker. The sendAsync
method calls are asynchronous calls that are non-blocking. They return a CompletableFuture
, which you can use to asynchronously receive the message ID once the messages are published.
As mentioned above, in this example we are using the send()
method:
@Bean ApplicationRunner runner(PulsarTemplate<User> pulsarTemplate) { return (args) -> pulsarTemplate.send("hello-user-topic", new User("Hello User", 1)); }
This code snippet uses Spring Boot’s @Bean
annotation to define a bean of type ApplicationRunner
. The ApplicationRunner
interface is used to perform some logic when the Spring Boot application starts. In this case, the logic is sending a message to a Pulsar topic using the pulsarTemplate
instance. The message being sent is an instance of the User
record with the values “Hello User” and 1.
(args) ->
is a lambda expression. It defines an anonymous function that takes in an argument args
and returns a value. In this case, the lambda expression is used to implement the ApplicationRunner
interface’s run
method. The run
method is invoked when the Spring Boot application starts. The args
parameter is typically used to pass in any command line arguments that were provided when starting the application.
7. Specifying Schema Information
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate
, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
7.1 Custom Schema Mapping
As an alternative to specifying the schema when invoking send operations on the PulsarTemplate
for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property. The following example uses application.yml
to add mappings for the User
complex object using the JSON
schemas.
spring: pulsar: defaults: type-mappings: - message-type: org.example.User topic-name: hello-user-topic schema-info: schema-type: JSON
The message-type
is the fully-qualified name of the message class.
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
7.2 Schema Resolver Customizer
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() { return (schemaResolver) -> { schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class)); schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class)); } }
With this configuration in place, there is no need to set specify the schema on send operations.
8. Message Consumption
8.1 Pulsar Listener
When it comes to Pulsar consumers, we recommend that end-user applications use the PulsarListener
annotation. To use PulsarListener
, you need to use the @EnablePulsar
annotation. But, when you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer).
Spring Boot auto-configuration also provides the ability to further configure the way Pulsar consumes the messages by specifying any of the spring.pulsar.consumer.*
application properties inside the application.yml
file.
Let us revisit the PulsarListener
code snippet:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar-topic") public void listen(String message) { System.out.println("Message Received: " + message); }
In the PulsarListener
method shown earlier, we receive the data as String
, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type. The framework detects that you expect the String
type and then infers the schema type based on that information. Then it provides that schema to the consumer. For all the primitive types in Java, the framework does this inference. For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType
property.
The following PulsarListener
method shows how we can consume complex types from a topic:
@PulsarListener(schemaType = SchemaType.JSON) void listen(User message) { System.out.println("Message Received: " + message); }
Note the addition of a schemaType
property on PulsarListener
. That is because the library is not capable of inferring the schema type from the provided type: User
. We must tell the framework what schema to use.
Let us look at a few more ways.
You can consume the Pulsar message directly:
@PulsarListener(subscriptionName = "my-subscription", topics = "hello-pulsar-topic") public void listen(org.apache.pulsar.client.api.Message<String> message) { System.out.println(message.getValue()); }
Here Apache Pulsar is giving us the choice to work directly with the message that is broadcast by the provider!
9. Using Dead-Letter Topic
Apache Pulsar lets applications use a dead letter topic on consumers with a Shared
subscription type. For the Exclusive
and Failover
subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:
package org.example; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.SubscriptionType; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarListener; @EnablePulsar @Configuration public class DeadLetterPolicyConfig { @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription", topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy", subscriptionType = SubscriptionType.Shared, properties = {"ackTimeout=1s"}) void listen1(String msg) { throw new RuntimeException("fail " + msg); } @PulsarListener(id = "dlqListener", topics = "my-dlq-topic") void listenDlq(String msg) { System.out.println("From DLQ: " + msg); } @Bean DeadLetterPolicy deadLetterPolicy() { return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build(); } }
First, we have a special bean for DeadLetterPolicy
, and it is named as deadLetterPolicy
(it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic
, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ
in Pulsar. Next, we provide this bean name to PulsarListener
by setting the deadLetterPolicy
property. Note that the PulsarListener
has a subscription type of Shared
, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout
value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy
), the Pulsar consumer publishes the messages to the DLQ topic. We have another PulsarListener
that listens on the DLQ topic to receive data as it is published to the DLQ topic.
10. Conclusion
In this article, we’ve explored the integration of Apache Pulsar with Spring Boot. By adding the Pulsar Spring Boot Starter, configuring the Pulsar client, specifying custom schemas, and creating publishers and consumers, you can build real-time, event-driven applications with ease. Additionally, we briefly discussed the concept of a Dead Letter Topic for handling failed messages.
As you continue to work with Apache Pulsar and Spring Boot, you can explore more advanced features and configurations to meet the specific needs of your real-time application. This combination of technologies offers a robust and scalable solution for handling high-throughput, low-latency data streams in your projects.
11. Download the Source Code
This was an example of how to integrate Apache Pulsar into your Spring Boot Application.
You can download the full source code of this example here: Spring Boot and Apache Pulsar Integration