Boot

Spring Boot and Apache Pulsar Integration

Apache Pulsar is a powerful distributed messaging system designed for high-throughput and low-latency data streaming. It’s used by organizations worldwide to build real-time, event-driven applications and microservices. Spring Boot, on the other hand, is a popular framework for building Java applications with minimal configuration. In this article, we’ll explore how to get started with Apache Pulsar and Spring Boot, leveraging the power of these two technologies for building scalable, real-time applications.

1. Intro

Apache Pulsar is an open-source, cloud-native messaging system that combines the best of both message queue and publish-subscribe models. It is designed for durability, scalability, and high performance, making it an excellent choice for building event-driven, real-time applications. Spring Boot, a project under the Spring Framework, simplifies the development of Java applications by providing a set of conventions and features for rapid development.

2. Maven Dependency

To get started with Apache Pulsar and Spring Boot, you need to add the Pulsar Spring Boot Starter dependency to your project. Open your pom.xml file and add the following dependency:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
        <version>3.2.0-SNAPSHOT</version>
    </dependency>
</dependencies>

When using Version 0.2.x the above coordinates change as follows:

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>

This starter simplifies the integration of Pulsar with your Spring Boot application.

3. Pulsar cluster

For local development and testing, we run Pulsar in standalone mode within a Docker container. We will create a docker-compose file containing all the instructions to run the Pulsar instance in standalone mode and Pulsar Manager.

version: '3.5'

services:
  pulsar:
    image: "apachepulsar/pulsar:2.10.1"
    command: bin/pulsar standalone
    environment:
      PULSAR_MEM: "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
    volumes:
      - ./pulsar/data:/pulsar/data
    ports:
      - "6650:6650"
      - "8080:8080"
    restart: unless-stopped
    networks:
      - pulsar_network
      
  pulsar-manager:
    image: "apachepulsar/pulsar-manager:v0.2.0"
    ports:
      - "9527:9527"
      - "7750:7750"
    depends_on:
      - pulsar
    environment:
      SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
    networks:
      - pulsar_network   
  
networks:
  pulsar_network:
    name: pulsar_network
    driver: bridge    

This is a Docker Compose file that defines two services: pulsar and pulsar-manager.

The pulsar service is using the apachepulsar/pulsar:2.10.1 Docker image. It runs a Pulsar standalone instance with the command bin/pulsar standalone. It sets the environment variable PULSAR_MEM to configure the memory settings. It also mounts the ./pulsar/data directory as a volume inside the container. It exposes ports 6650 and 8080 for Pulsar’s messaging and administrative interfaces respectively. The service will automatically restart unless explicitly stopped. It is connected to a network named pulsar_network.

The pulsar-manager service is using the apachepulsar/pulsar-manager:v0.2.0 Docker image. It exposes ports 9527 and 7750 for Pulsar Manager’s UI and API respectively. It depends on the pulsar service, meaning that the pulsar service will be started before the pulsar-manager service. It sets the environment variable SPRING_CONFIGURATION_FILE to specify the location of the application properties file. It is also connected to the pulsar_network.

The file also defines a network called pulsar_network with the bridge driver. This network is used to connect the pulsar and pulsar-manager services together.

Start the local standalone cluster:

docker-compose up -d

4. Application Code

The following listing shows the Spring Boot application case for the example:

package org.example;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.core.PulsarTemplate;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    ApplicationRunner runner(PulsarTemplate<User> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-user-topic", new User("Hello User", 1));
    }

    @Bean
    ApplicationRunner runner1(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
    }


    @PulsarListener(schemaType = SchemaType.JSON)
    void listen(User message) {
        System.out.println("Message Received: " + message);
    }

    @PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar-topic")
    public void listen(String message) {
        System.out.println("Message Received: " + message);
    }

    @PulsarListener(subscriptionName = "my-subscription", topics = "hello-pulsar-topic")
    public void listen(org.apache.pulsar.client.api.Message<String> message) {
        System.out.println(message.getValue());
    }

    @Bean
    ApplicationRunner runner2(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("topic-with-dlp", "Hello Pulsar World!");
    }


}

Let us quickly go through the higher-level details of this application. Later in the article we see these components in much more detail.

In the preceding sample, we heavily rely on Spring Boot auto-configuration. Spring Boot auto-configures several components for our application. It automatically provides a PulsarClient, which is used by both the producer and the consumer, for the application.

Spring Boot also auto-configures PulsarTemplate, which we inject into the application and start sending records to a Pulsar topic. The application sends messages to a topic named hello-pulsar. Note that the application does not specify any schema information, because Spring for Apache Pulsar library automatically infers the schema type from the type of the data that you send.

We use the PulsarListener annotation to consume from the hello-pulsar topic where we publish the data. PulsarListener is a convenience annotation that wraps the message listener container infrastructure in Spring for Apache Pulsar. Behind the scenes, it creates a message listener container to create and manage the Pulsar consumer. As with a regular Pulsar consumer, the default subscription type when using PulsarListener is the Exclusive mode. As records are published into the hello-pulsar topic, the Pulsarlistener consumes them and prints them on the console. The framework also infers the schema type used from the data type that the PulsarListner method uses as the payload — String, in this case.

Also, we have created a record named User, for demonstration purposes:

package org.example;

public record User(String name, int id) {
}

5. PulsarClient

When you use the Pulsar Spring Boot Starter, you get the PulsarClient auto-configured.

By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650. This can be adjusted by setting the spring.pulsar.client.service-url property to a different value.

You can further configure the client by specifying any of the spring.pulsar.client.* application properties inside application.yml file.

The application.yml file is located under the resources file, inside our source code. There we can specify all kinds of settings for our application.

Fig. 1: application.yml location.
Fig. 1: application.yml location.

If you are not using the starter, you will need to configure and register the PulsarClient yourself. There is a DefaultPulsarClientFactory that accepts a builder customizer that can be used to help with this.

6. Message Production

6.1 Pulsar Template

On the Pulsar producer side, Spring Boot auto-configuration provides a PulsarTemplate for publishing records. The template implements an interface called PulsarOperations and provides methods to publish records through its contract.

There are two categories of these send API methods: send and sendAsync. The send methods block calls by using the synchronous sending capabilities on the Pulsar producer. They return the MessageId of the message that was published once the message is persisted on the broker. The sendAsync method calls are asynchronous calls that are non-blocking. They return a CompletableFuture, which you can use to asynchronously receive the message ID once the messages are published.

As mentioned above, in this example we are using the send() method:

    @Bean
    ApplicationRunner runner(PulsarTemplate<User> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-user-topic", new User("Hello User", 1));
    }

This code snippet uses Spring Boot’s @Bean annotation to define a bean of type ApplicationRunner. The ApplicationRunner interface is used to perform some logic when the Spring Boot application starts. In this case, the logic is sending a message to a Pulsar topic using the pulsarTemplate instance. The message being sent is an instance of the User record with the values “Hello User” and 1.

 (args) -> is a lambda expression. It defines an anonymous function that takes in an argument args and returns a value. In this case, the lambda expression is used to implement the ApplicationRunner interface’s run method. The run method is invoked when the Spring Boot application starts. The args parameter is typically used to pass in any command line arguments that were provided when starting the application.

7. Specifying Schema Information

If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.

7.1 Custom Schema Mapping

As an alternative to specifying the schema when invoking send operations on the PulsarTemplate for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User complex object using the JSON schemas.

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: org.example.User
          topic-name: hello-user-topic
          schema-info:
            schema-type: JSON

The message-type is the fully-qualified name of the message class.

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

7.2 Schema Resolver Customizer

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

With this configuration in place, there is no need to set specify the schema on send operations.

8. Message Consumption

8.1 Pulsar Listener

When it comes to Pulsar consumers, we recommend that end-user applications use the PulsarListener annotation. To use PulsarListener, you need to use the @EnablePulsar annotation. But, when you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer).

Spring Boot auto-configuration also provides the ability to further configure the way Pulsar consumes the messages by specifying any of the spring.pulsar.consumer.* application properties inside the application.yml file.

Let us revisit the PulsarListener code snippet:

    @PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar-topic")
    public void listen(String message) {
        System.out.println("Message Received: " + message);
    }

In the PulsarListener method shown earlier, we receive the data as String, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type. The framework detects that you expect the String type and then infers the schema type based on that information. Then it provides that schema to the consumer. For all the primitive types in Java, the framework does this inference. For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType property.

Fig. 2: Simple Apache Pulsar Listener.
Fig. 2: Simple Apache Pulsar Listener.

The following PulsarListener method shows how we can consume complex types from a topic:

    @PulsarListener(schemaType = SchemaType.JSON)
    void listen(User message) {
        System.out.println("Message Received: " + message);
    }

Note the addition of a schemaType property on PulsarListener. That is because the library is not capable of inferring the schema type from the provided type: User. We must tell the framework what schema to use.

Fig. 3: Complex Apache Pulsar LIstener.
Fig. 3: Complex Apache Pulsar LIstener.

Let us look at a few more ways.

You can consume the Pulsar message directly:

    @PulsarListener(subscriptionName = "my-subscription", topics = "hello-pulsar-topic")
    public void listen(org.apache.pulsar.client.api.Message<String> message) {
        System.out.println(message.getValue());
    }

Here Apache Pulsar is giving us the choice to work directly with the message that is broadcast by the provider!

9. Using Dead-Letter Topic

Apache Pulsar lets applications use a dead letter topic on consumers with a Shared subscription type. For the Exclusive and Failover subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:

package org.example;

import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;


@EnablePulsar
@Configuration
public class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription", topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy", subscriptionType = SubscriptionType.Shared, properties = {"ackTimeout=1s"})
    void listen1(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }
    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

First, we have a special bean for DeadLetterPolicy, and it is named as deadLetterPolicy (it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ in Pulsar. Next, we provide this bean name to PulsarListener by setting the deadLetterPolicy property. Note that the PulsarListener has a subscription type of Shared, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy), the Pulsar consumer publishes the messages to the DLQ topic. We have another PulsarListener that listens on the DLQ topic to receive data as it is published to the DLQ topic.

10. Conclusion

In this article, we’ve explored the integration of Apache Pulsar with Spring Boot. By adding the Pulsar Spring Boot Starter, configuring the Pulsar client, specifying custom schemas, and creating publishers and consumers, you can build real-time, event-driven applications with ease. Additionally, we briefly discussed the concept of a Dead Letter Topic for handling failed messages.

As you continue to work with Apache Pulsar and Spring Boot, you can explore more advanced features and configurations to meet the specific needs of your real-time application. This combination of technologies offers a robust and scalable solution for handling high-throughput, low-latency data streams in your projects.

11. Download the Source Code

This was an example of how to integrate Apache Pulsar into your Spring Boot Application.

Download
You can download the full source code of this example here: Spring Boot and Apache Pulsar Integration

Odysseas Mourtzoukos

Mourtzoukos Odysseas is studying to become a software engineer, at Harokopio University of Athens. Along with his studies, he is getting involved with different projects on gaming development and web applications. He is looking forward to sharing his knowledge and experience with the world.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button