spring

Spring Integration Poller Example

1. Introduction

As we already had an introduction to Spring Integration, it has an event-driven messaging architecture.

To take a quick recap, for every Message communication to a JMS destination involves two endpoints, which is the Producer Endpoint and Consumer Endpoint. The endpoints demarcate the ends of a particular Channel, such that the Producer Endpoint produces the Message and drops it to a designated Channel. On the other end of the Channel is the Consumer Endpoint, which is ready to consume the Message on the same channel. The Producer Endpoint is the inbound-channel-adapter, while Consumer Endpoint is the service-activator. Service Activator (Consumer Endpoint) polls for Messages on the channel, and once it finds any, it activates the service method, which further processes the Message accordingly and passes on to the Outbound Channel Adapter. The Outbound Channel Adapter uses the jmsTemplate to finally drop the Message to the destination queue.

There are two JMS-based Inbound Adapters that Spring Integration provides, which are message-driven-channel-adapter and inbound-channel-adapter. inbound-channel-adapter receives the message based on the polling parameters that we specify, so it keeps polling the configured destination queue to check for the Messages on the destination queue. The message-driven-channel-adapter relies upon Spring MessageListener container.

2. Maven dependencies

Let’s check out an example where we explicitly apply polling for Messages on the Producer and Consumer Endpoints.

We must make sure we have proper dependencies in pom.xml file.

pom.xml

<properties>
	<spring.version>4.0.0.RELEASE</spring.version>
	<jms.version>1.1-rev-1</jms.version>
	<activemq.version>5.2.0</activemq.version>
	<spring.integration.version>4.1.2.RELEASE</spring.integration.version>
</properties>

<!-- JMS -->
<dependency>
	<groupId>javax.jms</groupId>
	<artifactId>jms-api</artifactId>
	<version>${jms.version}</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>${spring.version}</version>
</dependency>

<!-- Include ActiveMQ -->
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-core</artifactId>
	<version>${activemq.version}</version>
</dependency>
		
<!-- Spring Integration -->
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-core</artifactId>
	<version>${spring.integration.version}</version>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-jms</artifactId>
	<version>${spring.integration.version}</version>
</dependency>

3. Spring Integration Configuration Setup

The Spring Integration configuration can be done in one of the context files, which can be included within the root application context.

jmsContext.xml

<!-- ActiveMQ connection factory bean -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
	<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
	
<!-- Define a channel to communicate out to a JMS Destination -->
<int:channel id="inbound" />
<int:channel id="outbound" />
	
<int:service-activator input-channel="inbound"
	output-channel="outbound" ref="simpleMessageListener" method="onMessage" />

<int-jms:outbound-channel-adapter id="jmsOut"
	channel="outbound" connection-factory="connectionFactory"
	destination-name="sampleQueue" />

The component that is missing in the above configuration is the inbound-channel-adapter, which we will add one after the other for different poller scenarios, as we proceed with the example.

4. Applying Poller

Below are all the possible configuration options that we have, to define a poller:

<int:poller cron=""
	default="false"
	error-channel=""
	fixed-delay=""
	fixed-rate=""
	id=""
	max-messages-per-poll=""
	receive-timeout=""
	ref=""
	task-executor=""
	time-unit="MILLISECONDS"
	trigger="">
	<int:advice-chain />
	<int:transactional />
</int:poller>

4.1 Simple Poller Configuration

So let’s apply a very simple poller to our inbound-channel-adaptor as:

<int-jms:inbound-channel-adapter
	channel="inbound" connection-factory="connectionFactory"
	destination-name="testQueue" acknowledge="auto">
	<int:poller fixed-delay="1000" />
</int-jms:inbound-channel-adapter>

The fixed-delay of poller is specified in milliseconds to create a Periodic Trigger.

4.2 Poller fixed-rate vs fixed-delay

There is a significant difference between fixed-rate and fixed-delay. With fixed-rate, it doesn’t matter how long the previous execution took, the next execution will happen when it was scheduled. With fixed-delay, the next execution will always happen x time after the previous finished, even if it was delayed.

4.3 Poller configuration with cron-scheduler

We can also apply a cron expression instead of fixed-delay to poll for the new Message on the destination queue.

<int-jms:inbound-channel-adapter
	channel="inbound" connection-factory="connectionFactory"
	destination-name="testQueue" acknowledge="auto">
	<!-- cron expression below to poll every second -->
	<int:poller cron="* * * * * ?" />
</int-jms:inbound-channel-adapter>

4.4 Global default Poller

We can specify a global default poller as well, making sure that there is occurs only once in the application context. We can add a global default poller to our context file as:

<int:poller default="true" fixed-delay="50"/>

4.5 Asynchronous Polling

There is asynchronous polling that can be achieved in the same way as the standard poller configuration, where we need to add a TaskExecutor to the configuration. TaskExecutors can also be configured through the Spring task namespace. A simple asynchronous poller configuration would like similar to the following:

<int-jms:inbound-channel-adapter
	channel="inbound" connection-factory="connectionFactory"
	destination-name="testQueue" acknowledge="auto">
	<int:poller task-executor="taskExecutor" fixed-delay="1000" />
</int-jms:inbound-channel-adapter>

<task:executor id="taskExecutor" pool-size="20" queue-capacity="20"/>

pool-size=20 indicates that at the max 20 tasks can be executed separately on different threads concurrently. Before a new item is pushed into the task-executor pool, the pool availability is checked. If all the 20 tasks are busy, the new item is pushed to the queue until any of the ongoing tasks finish its execution on the task-executor pool. Queue limit is specified through a separate poller attribute, queue-capacity. As per our configuration, at the max 20 items can be present in the queue before claiming their place on the pool.

5. Listener Setup

There is a listener that we have in our sample application, whose method gets invoked by service-activator as soon as the Message arrives at the Consumer Endpoint.

SimpleMessageListener.java

package com.jcombat.listener;

public class SimpleMessageListener {

	public String onMessage(String message) {
		System.out.println(message);
		return message;
	}
}

6. Verify message

To successfully verify the working of our sample application, we just need to start our ActiveMQ server, and login to the Hawtio console and from the queues listed on the left, click on the testQueue link. We see a Send button through which we can pass any text to this queue, same as can be seen in the below snapshot.

Send message through Hawtio console
Send message through Hawtio console

Once the message on the testQueue is sent, watch the server console for the same Message getting logged. This is because, as per our configuration, the service-activator component invokes our listener class onMessage() method, that actually processes the Message (in our case we are just displaying it), which appears on the server console as:

Queue Message gets logged to server console
Queue Message gets logged to server console

Based on the outbound-channel-adapter configuration in our context file, the above processed Message is dispatched to the outbound adapter, which finally drops the Message to the outbound destination queue, sampleQueue. We can similarly verify the message that finally arrives on the sampleQueue as can be seen below:

Message arrives on destination queue
Message arrives on destination queue

7. Download the Maven project

Download
You can download the full source code of this example here: Spring Integration Poller Demo

Abhimanyu Prasad

Abhimanyu is a passionate tech blogger and senior programmer, who has an extensive end-to-end development experience with wide range of technologies. He is the founder and administrator at jCombat.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button