Spring Integration Aggregator Example
1. Introduction
In the last article Spring Boot Integration With Active MQ we discussed on how straightforward and efficient it was to use Spring Integration framework to build messaging applications with JMS. In this article we will be discussing about a useful Spring Integration component called Aggregator.
Enterprise applications with large datasets may need to process lot of information and sending an entire dataset to one location for processing, would be inefficient. Thus we need to partition data by splitting into smaller pieces. This is achieved by a Spring Integration component called Splitter. After splitting and processing the messages, the dataset need to be grouped. This is done by a Spring Integration component called Aggregator. Aggregator groups, messages and processes them, based on certain strategies as discussed below.
Let’s look at some basic definitions of different components that are used in message aggregation as below.
1.1 Splitter
The main role of Splitter is to partition message into smaller parts and the resulting messages can be sent independently of each other. A good example would be an order processing system where an order can multiple line items that contain products from different companies. By identifying the line item corresponding to each vendor, the splitter will create individual message for each company.
1.2 Aggregator
Aggregator is a component that combines a group of related messages based on a correlation strategy and processes them. In our example above, company Id or Order Id can be a correlation strategy for grouping related items and create a single message by processing a whole group. The main idea behind defining and implementing aggregator is to provide a logic, that has to be executed when aggregating (i.e creating single messages from group of messages) takes place. There are two main strategies for aggregator as below:
1.2.1 Correlation Strategy
Grouping of messages is mainly done based on the CORRELATION_ID
message header (i.e messages having same CORRELATION_ID
are group together). But users can also choose to implement a custom CorrelationStrategy
that defines how messages can be grouped. Spring Integration framework provides HeaderAttributeCorrelationStrategy
out of box. We can implement custom correlation strategy as a separate class or configuring “correlation-strategy-expression” as part of the spring configuration xml.
public class MyCorrelationStrategy implements CorrelationStrategy { public Object getCorrelationKey(Message<?> message) { //return the CORRELATION_ID here } }
1.2.2 Release Strategy
For a group of related messages ReleaseStrategy
indicates when the set of collected messages should be sent or released for aggregation. The default implementation provided by Spring Integration framework is SequenceSizeReleaseStrategy
that basically checks presence of messages grouped by SEQUENCE_SIZE
.
For example if the SEQUENCE_SIZE
is 5 ,then the SequenceSizeReleaseStrategy
will trigger a signal to the aggregator to begin processing only after it receives 5 messages with sequence numbers 1 to 5. Similar to custom CorrelationStrategy
above Spring Integration also allows us to specify a custom ReleaseStrategy
as below.
public class MyReleaseStrategy implements ReleaseStrategy { public boolean canRelease(MessageGroup group) { //implement your strategy to return true or false } }
2. Maven Dependencies
Following set of dependencies are required to be set up as part of our pom.xml in order for us to run our Order Processing example for aggregator
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springinteg.aggregator.example</groupId> <artifactId>spring-integration-aggregator-example</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <springframework.version>4.2.0.RELEASE</springframework.version> <spring.integration.version>4.2.0.RELEASE</spring.integration.version> </properties> <dependencies> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> </dependencies> </project>
3. Spring Integration Configuration
We have to define basic components for defining aggregator as part of spring context configuration i.e queue channel (for input and output),a gateway bean and aggregator bean with correlation and release strategy. CorrelationStrategy
is defined as “payload.type” (Single or Recurring). ReleaseStrategy
is define as payload with size as 2 . After receiving two messages the aggregator will get triggered for grouping messages based on correlation strategy.
Spring-integ-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:sftp="http://www.springframework.org/schema/integration/sftp" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/beans/spring-integration-stream.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/beans/spring-task.xsd http://www.springframework.org/schema/integration-sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/beans/spring-util.xsd"> <bean id="reader" class="com.springinteg.aggregator.Reader" /> <bean id="order" class="com.springinteg.aggregator.model.Order" /> <int:channel id="orderInChannel" > <int:queue /> </int:channel> <int:channel id="orderProcChannel" /> <int:channel id="aggrOutChannel"> <int:queue /> </int:channel> <int:channel id="orderOutChannel"> <int:queue /> </int:channel> <int:gateway id="ordergatewaybean" service-interface="com.springinteg.aggregator.gateway.OrderGateway" default-request-channel="orderInChannel" /> <int:aggregator id="ordersAggregator" input-channel="orderInChannel" output-channel="aggrOutChannel" correlation-strategy-expression="payload.type" release-strategy-expression="size() ==2" /> <int:service-activator input-channel="aggrOutChannel" ref="reader" method="processOrder" output-channel="orderOutChannel" /> <int:poller id="poller" default="true" fixed-delay="1000" /> </beans>
4. Application Configuration
Above we have seen how to configure the basic aggregator with different components. Now we will define each of these components as java classes below
4.1 Gateway Component
Gateway interface class with processing method for receiving and processing input messages before routing them to aggregator.
OrderGateway.java
package com.springinteg.aggregator.gateway; import org.springframework.messaging.Message; import com.springinteg.aggregator.model.Order; public interface OrderGateway { public void process(Message message) ; }
4.2 Payload Component
Payload defining the Order message as below. The type refers to whether its a “SINGLE” or “RECURRING” order.
Order.java
package com.springinteg.aggregator.model; public class Order { private String type; private int amount; public Order(){} public Order(String type, int amount){ this.type = type; this.amount = amount; } public String getType(){ return this.type; } @Override public String toString(){ return "Order[ type=" + this.type + " ,amount= " + this.amount + "]" ; } }
4.1 Service Activator Component
We use a service activator to process messages from aggregate channel and print the message to console.
Reader.java
package com.springinteg.aggregator; import java.util.ArrayList; import java.util.List; import org.springframework.messaging.Message; import com.springinteg.aggregator.model.Order; public class Reader { public void processOrder(Message<List> ordermessage){ System.out.println("Orders"); List payments = new ArrayList(); payments = ordermessage.getPayload(); for(Order pay: payments){ System.out.println(pay.toString()); } } }
5. Verification Test For Aggregator
We can verify how messages are aggregated using a simple unit test that will demonstrate how we can invoke gateway to send messages, verify if the aggregated messages are grouped by correlation strategy.
OrderAggregatorTest.java
package com.springinteg.aggregator; import static org.junit.Assert.*; import java.util.List; import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import com.springinteg.aggregator.gateway.OrderGateway; import com.springinteg.aggregator.model.Order; public class OrderAggregatorTest { ApplicationContext context = null; @Before public void setUp() { context = new ClassPathXmlApplicationContext("spring-integ-context.xml"); } @Before public void destroy() { context = null; } @Test public void testOrderAggregator() { OrderGateway paygate = context.getBean("ordergatewaybean", OrderGateway.class); Order p1 = new Order("SINGLE", 11); Order p2 = new Order("RECURRING", 25); Order p3 = new Order("SINGLE", 32); Order p4 = new Order("RECURRING", 15); QueueChannel orderChannel = context.getBean("orderOutChannel", QueueChannel.class); Message mp1 = new GenericMessage(p1); paygate.process(mp1); Message<List> processedOrders = (Message<List>) orderChannel.receive(10000); assertNull(processedOrders); Message mp2 = new GenericMessage(p2); Message mp3 = new GenericMessage(p3); Message mp4 = new GenericMessage(p4); paygate.process(mp2); paygate.process(mp3); paygate.process(mp4); processedOrders = (Message<List>) orderChannel.receive(60000); verifyAggregOrderTypes(processedOrders); } private void verifyAggregOrderTypes(Message<List> reply) { // TODO Auto-generated method stub List orders = reply.getPayload(); Order firstOrder = orders.get(0); for (int i = 1; i < orders.size(); i++) { assertEquals(firstOrder.getType(), orders.get(i).getType()); } } }
5.1 Screenshot showing grouping of messages
6. Conclusion
In the above article we have seen how aggregators can be used to assemble multiple messages, group them based on correlation strategy and process them based on release strategy. Aggregation in general, is a complex task since all the set of messages belonging to a particular set have to arrive before aggregator can start processing.
7. Download the Source Code
This is a Spring Integration Aggregator Example
You can download the full source code of this example here: spring-integration-aggregator-master
ordergatewaybean don’t have any implementation class so how it is working?