Integration

Spring Integration Aggregator Example

1. Introduction

In the last article Spring Boot Integration With Active MQ we discussed on how straightforward and efficient it was to use Spring Integration framework to build messaging applications with JMS. In this article we will be discussing about a useful Spring Integration component called Aggregator.
 
 
 
 
 

 
Enterprise applications with large datasets may need to process lot of information and sending an entire dataset to one location for processing, would be inefficient. Thus we need to partition data by splitting into smaller pieces. This is achieved by a Spring Integration component called Splitter. After splitting and processing the messages, the dataset need to be grouped. This is done by a Spring Integration component called Aggregator. Aggregator groups, messages and processes them, based on certain strategies as discussed below.

Let’s look at some basic definitions of different components that are used in message aggregation as below.

1.1 Splitter

The main role of Splitter is to partition message into smaller parts and the resulting messages can be sent independently of each other. A good example would be an order processing system where an order can multiple line items that contain products from different companies. By identifying the line item corresponding to each vendor, the splitter will create individual message for each company.

1.2 Aggregator

Aggregator is a component that combines a group of related messages based on a correlation strategy and processes them. In our example above, company Id or Order Id can be a correlation strategy for grouping related items and create a single message by processing a whole group. The main idea behind defining and implementing aggregator is to provide a logic, that has to be executed when aggregating (i.e creating single messages from group of messages) takes place. There are two main strategies for aggregator as below:

1.2.1 Correlation Strategy

Grouping of messages is mainly done based on the CORRELATION_ID message header (i.e messages having same CORRELATION_ID are group together). But users can also choose to implement a custom CorrelationStrategy that defines how messages can be grouped. Spring Integration framework provides HeaderAttributeCorrelationStrategy out of box. We can implement custom correlation strategy as a separate class or configuring “correlation-strategy-expression” as part of the spring configuration xml.

public class MyCorrelationStrategy implements CorrelationStrategy {
 public Object getCorrelationKey(Message<?> message) {
   //return the CORRELATION_ID here
 } 
}

1.2.2 Release Strategy

For a group of related messages ReleaseStrategy indicates when the set of collected messages should be sent or released for aggregation. The default implementation provided by Spring Integration framework is SequenceSizeReleaseStrategy that basically checks presence of messages grouped by SEQUENCE_SIZE.

For example if the SEQUENCE_SIZE is 5 ,then the SequenceSizeReleaseStrategy will trigger a signal to the aggregator to begin processing only after it receives 5 messages with sequence numbers 1 to 5. Similar to custom CorrelationStrategy above Spring Integration also allows us to specify a custom ReleaseStrategy as below.

public class MyReleaseStrategy implements ReleaseStrategy { 
public boolean canRelease(MessageGroup group) { 
//implement your strategy to return true or false 
} 
}

2. Maven Dependencies

Following set of dependencies are required to be set up as part of our pom.xml in order for us to run our Order Processing example for aggregator

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.springinteg.aggregator.example</groupId>
	<artifactId>spring-integration-aggregator-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<springframework.version>4.2.0.RELEASE</springframework.version>
		<spring.integration.version>4.2.0.RELEASE</spring.integration.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>${spring.integration.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.integration.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${springframework.version}</version>
		</dependency>
	</dependencies>
</project>

3. Spring Integration Configuration

We have to define basic components for defining aggregator as part of spring context configuration i.e queue channel (for input and output),a gateway bean and aggregator bean with correlation and release strategy. CorrelationStrategy is defined as “payload.type” (Single or Recurring). ReleaseStrategy is define as payload with size as 2 . After receiving two messages the aggregator will get triggered for grouping messages based on correlation strategy.

Spring-integ-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
	xmlns:file="http://www.springframework.org/schema/integration/file"
	xmlns:sftp="http://www.springframework.org/schema/integration/sftp"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/stream 
	    http://www.springframework.org/schema/beans/spring-integration-stream.xsd
		http://www.springframework.org/schema/task 
		http://www.springframework.org/schema/beans/spring-task.xsd
		http://www.springframework.org/schema/integration-sftp 
		http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration 
		http://www.springframework.org/schema/integration/spring-integration-4.2.xsd
		http://www.springframework.org/schema/util 
		http://www.springframework.org/schema/beans/spring-util.xsd">


	<bean id="reader" class="com.springinteg.aggregator.Reader" />
	<bean id="order" class="com.springinteg.aggregator.model.Order" />

	<int:channel id="orderInChannel" >
          <int:queue />
    </int:channel>
	<int:channel id="orderProcChannel" />
	<int:channel id="aggrOutChannel">
		<int:queue />
	</int:channel>
	<int:channel id="orderOutChannel">
		<int:queue />
	</int:channel>

	<int:gateway id="ordergatewaybean"
		service-interface="com.springinteg.aggregator.gateway.OrderGateway"
		default-request-channel="orderInChannel" />

	<int:aggregator id="ordersAggregator" input-channel="orderInChannel"
		output-channel="aggrOutChannel" correlation-strategy-expression="payload.type"
		release-strategy-expression="size() ==2" />

	<int:service-activator input-channel="aggrOutChannel"
		ref="reader" method="processOrder" output-channel="orderOutChannel" />
	<int:poller id="poller" default="true" fixed-delay="1000" />

</beans>

4. Application Configuration

Above we have seen how to configure the basic aggregator with different components. Now we will define each of these components as java classes below

4.1 Gateway Component

Gateway interface class with processing method for receiving and processing input messages before routing them to aggregator.

OrderGateway.java

package com.springinteg.aggregator.gateway;

import org.springframework.messaging.Message;

import com.springinteg.aggregator.model.Order;

public interface OrderGateway {
	public void process(Message message) ;
}

4.2 Payload Component

Payload defining the Order message as below. The type refers to whether its a “SINGLE” or “RECURRING” order.

Order.java

package com.springinteg.aggregator.model;


public class Order {
	
	private String type;
	private int amount;
	
	public Order(){}
	
	public Order(String type, int amount){
		this.type = type;
		this.amount = amount;
	}
	
	
	public String getType(){
		return this.type;
	}

	@Override
	public String toString(){
		return "Order[ type=" + this.type + " ,amount= " + this.amount + "]" ;
	}
}

4.1 Service Activator Component

We use a service activator to process messages from aggregate channel and print the message to console.

Reader.java

package com.springinteg.aggregator;

import java.util.ArrayList;
import java.util.List;

import org.springframework.messaging.Message;

import com.springinteg.aggregator.model.Order;

public class Reader {
	
	public void processOrder(Message<List> ordermessage){
		
		System.out.println("Orders");
		List payments = new ArrayList();
		payments = ordermessage.getPayload();
		for(Order pay: payments){
			System.out.println(pay.toString());
		}
	}

}

5. Verification Test For Aggregator

We can verify how messages are aggregated using a simple unit test that will demonstrate how we can invoke gateway to send messages, verify if the aggregated messages are grouped by correlation strategy.

OrderAggregatorTest.java

package com.springinteg.aggregator;

import static org.junit.Assert.*;

import java.util.List;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import com.springinteg.aggregator.gateway.OrderGateway;
import com.springinteg.aggregator.model.Order;

public class OrderAggregatorTest {
	ApplicationContext context = null;
	
	@Before
	public void setUp() {
		context = new ClassPathXmlApplicationContext("spring-integ-context.xml");
	}
	

	@Before
	public void destroy() {
		context = null;
	}

	@Test
	public void testOrderAggregator() {
		OrderGateway paygate = context.getBean("ordergatewaybean", OrderGateway.class);
		Order p1 = new Order("SINGLE", 11);
		Order p2 = new Order("RECURRING", 25);
		Order p3 = new Order("SINGLE", 32);
		Order p4 = new Order("RECURRING", 15);
		QueueChannel orderChannel = context.getBean("orderOutChannel", QueueChannel.class);

		Message mp1 = new GenericMessage(p1);
		paygate.process(mp1);
		Message<List> processedOrders = (Message<List>) orderChannel.receive(10000);
		assertNull(processedOrders);
		Message mp2 = new GenericMessage(p2);
		Message mp3 = new GenericMessage(p3);
		Message mp4 = new GenericMessage(p4);

		paygate.process(mp2);
		paygate.process(mp3);
		paygate.process(mp4);
		processedOrders = (Message<List>) orderChannel.receive(60000);
		verifyAggregOrderTypes(processedOrders);

	}
	
	private void verifyAggregOrderTypes(Message<List> reply) {
		// TODO Auto-generated method stub
		List orders = reply.getPayload();
		Order firstOrder = orders.get(0);
		for (int i = 1; i < orders.size(); i++) {
			assertEquals(firstOrder.getType(), orders.get(i).getType());
		}
	}

}	

5.1 Screenshot showing grouping of messages

Fig 1: Verifying aggregated messages

6. Conclusion

In the above article we have seen how aggregators can be used to assemble multiple messages, group them based on correlation strategy and process them based on release strategy. Aggregation in general, is a complex task since all the set of messages belonging to a particular set have to arrive before aggregator can start processing.

7. Download the Source Code

This is a Spring Integration Aggregator Example

Download
You can download the full source code of this example here: spring-integration-aggregator-master

Raghuram Gururajan

Raghuram is a Senior Staff Software Engineer and has more than thirteen years of experience working on web, desktop and mobile applications. He holds a Master's degree in Computer Science from the University of North Carolina . He is a passionate about working on Distributed Systems and specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a e-commerce company where he supports development of core API's/RESTFul Services that are part of the Merchandise Product Catalog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
guna
guna
4 years ago

ordergatewaybean don’t have any implementation class so how it is working?

Back to top button