Spring Integration Chain Example
1. Introduction
Spring Integration provides a way to group set of elements into one transaction and its referred as chaining. In this post we will look at MessageHandlerChain
that is an implementation of MessageHandler
and it can be configured as a single message endpoint while actually delegating to a chain of handlers like Filters,Transformers etc. Though the handler chain allows us to simplify configuration by grouping elements together internally it maintains the same set of loose coupling between components. Internally MessageHandlerChain
will be expanded into a linear set of listed endpoints, separated by anonymous channels.
2. Message Chain Handlers
The chain element is the root element of the configuration and contains an “id” attribute. Starting from Spring Integration 3.0, if a chain element is given an “id”, bean name for the element is a combination of chain’s “id” and the “id” of the element itself. Chain element contains input-channel
attribute for specifying channel to consume messages, if the last element of the chain is capable of producing output then it contains output-channel
attribute.MessageHandlerChain
is an implementation of org.springframework.integration.handler.MessageHandlerChain
which itself implements MessageHandler
interface by extending AbstractMessageHandler
class. All the elements defined in the chain are implementation of MessageHandler
too.
The below message handler chain basically shows the main components of chain namely header,filter and service activator.
MessageHandlerChain
<int:chain id="messageHandlerChain" input-channel="sender" output-channel="receiver"> <int:header-enricher> <int:header name="foo" value="bar"/> </int:header-enricher> <int:filter ref="messageFilter" throw-exception-on-rejection="true" discard-channel="trash" /> <int:service-activator ref="messageActivator" requires-reply="true" method="handleMessages"/> </int:chain>
Lets take a look at some of the components in above definition.
2.1 Filters in Spring Integration
Filters in Spring Integration are implementation of a Enterprise Integration Pattern called selective consumer. The pattern represents enterprise application not consuming selective set of messages based on criteria or expectation. Spring Integration message filter uses message selector org.springframework.integration.core.MessageSelector
that contains the method boolean accept(Message message)
that analyzes message instance based on passed input parameter and returns true if message should be accepted. The code below demonstrates basic implementation of a message selector that accepts or rejects messages.
MessageSelector
@Override protected Object doHandleRequestMessage(Message message) { if (this.selector.accept(message)) { return message; } else { return null; } }
Rejected messages are sent to discard-channel
.
2.2 Headers Enrichment
As the name indicates, headers-enricher
is a component that modifies header values. Because message header’s are immutable, headers-enricher
retrieves the list of existent header as mutable HashMap
instance and after that add new entries defined in the headers-enricher
element. At the end of processing a new message is returned that contains payload of received message and headers enriched with new values.
2.3 Nested Chain Handlers
Sometimes we need to call one chain within another and then continue the execution in the original chain. To accomplish the same we can use gateway element as below.
The below code demonstrates the nested chain configuration using gateway element.
Nested chain handler config
<int:chain id="main-chain" input-channel="main-in" output-channel="main-out"> <int:header-enricher> <int:header name="name" value="main-header" /> </int:header-enricher> <int:gateway request-channel="gateway-main-in"/> </int:chain> <int:chain id="nested-chain" input-channel="gateway-main-in"> <int:header-enricher> <int:header name="name" value="nested-header" /> </int:header-enricher> <int:gateway request-channel="gateway-nested-chain"/> <int:service-activator> <bean class="org.sample.MsgProc" /> </int:service-activator> </int:chain>
In the above configuration we have a primary chain main-chain
that basically adds the enrichment header main-header
and the requested is routed through the gateway channel gateway-main-in
. The secondary chain listens to gateway-main-in
and adds additional header nested-header
.
3. Maven Configuration
The maven configuration for spring integration chain handlers are as below.The core dependencies need for the project are spring-integration-core
,spring-integration-stream
. Maven configuration file with basic dependencies for configuring message chain handlers.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springinteg.chaining</groupId> <artifactId>spring-integration-chaining</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>spring-integration-chaining Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <springframework.version>4.3.0.RELEASE</springframework.version> <spring.integration.version>4.3.0.RELEASE</spring.integration.version> </properties> <dependencies> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <scope>compile</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> </dependencies> <build> <finalName>spring-integration-chaining</finalName> </build> </project>
4. Spring Integration Configuration
Below are the basic set of classes used for configuring a message chain handler
- Enricher
- Message Selector
- Message payload POJO
- Processor
4.1 AccessTimeEnricher.java
This class basically is used to enrich the header with access timestamp.
A basic implementation of header-enricher
component for basically returning access time as the enrichment value.
AccessTimeEnricher
package com.springinteg.chaining; import org.springframework.stereotype.Component; @Component public class AccessTimeEnricher { public String appendTime() throws InterruptedException { return String.valueOf(System.currentTimeMillis()); } }
4.2 OrderMsgSelector.java
This selector class is used by filter element to basically accept or reject a message payload.
The below OrderMsgSelector
contains accept method that accepts or rejects the payload based on the criteria that the message payload is of type OrderPayload
and the message contains the text orderMsg
.
OrderMsgSelector
package com.springinteg.chaining; import org.springframework.integration.core.MessageSelector; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component public class OrderMsgSelector implements MessageSelector { public boolean accept(Message message) { if (message.getPayload().getClass() != OrderPayload.class) { return false; } OrderPayload msgPayload = (OrderPayload) message.getPayload(); return "orderMsg".equalsIgnoreCase(msgPayload.getContent()); } }
4.3 OrderPayload.java
The below class is used to define the actual message that will be sent to the input channel of the chain handler.
Basic OrderPayload
implementation with getters/setters for the contents of order.
OrderPayload
package com.springinteg.chaining; import org.springframework.stereotype.Component; @Component public class OrderPayload { private String content; public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
4.4 OrderProcessor.java
This class is basically used to process order messages in a chain after its been processed by filter.
The below class is used to define ServiceActivator
bean for processing order messages.
OrderProcessor
package com.springinteg.chaining; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component public class OrderProcessor { public String processOrder(Message msg) { return ((OrderPayload) msg.getPayload()).getContent().toString(); } }
4.5 Spring integration configuration
The below spring integration configuration file shows the basic set of beans to be defined as part of defining chain handlers and nested chain handlers.
The below xml is basically used to define the basic spring integration configuration file with channels for sending messages to the chain and channel for capturing output from the chain.
spring-integ-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config /> <context:component-scan base-package="com.springinteg.chaining" /> <int:channel id="senderChannel"> <int:queue capacity="10" /> </int:channel> <int:channel id="receiverChannel"> <int:queue capacity="10" /> </int:channel> <int:channel id="trashChannel"> <int:queue capacity="10" /> </int:channel> <int:channel id="senderChannelForNested"> <int:queue capacity="10" /> </int:channel> <int:channel id="gatewaySenderChannelForNested"> <int:queue capacity="10" /> </int:channel> <int:channel id="receiverChannelForNested"> <int:queue capacity="10" /> </int:channel> <int:chain id="orderChain" input-channel="senderChannel" output-channel="receiverChannel"> <int:header-enricher> <int:header name="enriched" value="enriched-Header" /> </int:header-enricher> <int:filter ref="orderFilter" throw-exception-on-rejection="true" discard-channel="trashChannel" /> <int:service-activator ref="orderProcessor" requires-reply="true" method="processOrder" /> </int:chain> <bean id="orderFilter" class="org.springframework.integration.filter.MessageFilter" scope="prototype"> <constructor-arg ref="orderMsgSelector" /> </bean> <int:chain id="orderProducerNestedChain" input-channel="senderChannelForNested" output-channel="receiverChannelForNested"> <int:header-enricher> <int:header name="main-channel" ref="accessTimeEnricher" method="appendTime" /> </int:header-enricher> <int:filter ref="orderFilter" throw-exception-on-rejection="true" discard-channel="trashChannel" /> <int:gateway request-channel="gatewaySenderChannelForNested" /> </int:chain> <int:chain id="orderRecieverNestedChain" input-channel="gatewaySenderChannelForNested"> <int:header-enricher> <int:header name="nested-channel" ref="accessTimeEnricher" method="appendTime" /> </int:header-enricher> <int:filter ref="orderFilter" throw-exception-on-rejection="true" discard-channel="trashChannel" /> </int:chain> <int:poller id="poller" default="true" fixed-delay="1000" /> </beans>
5. Spring Integration Test
The below code illustrates the test to verify the message sent to chain handlers and sequence of operations invoked as part of the chain and verifying the headers and the output from the chain handler.
ChainingHandlersTest
import static org.junit.Assert.*; import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import com.springinteg.chaining.OrderPayload; public class ChainingHandlersTest { ApplicationContext context = null; @Before public void setUp() { context = new ClassPathXmlApplicationContext("spring-integ-context.xml"); } @Before public void destroy() { context = null; } @Test public void testOrderChaining() { OrderPayload order = context.getBean("orderPayload", OrderPayload.class); order.setContent("orderMsg"); Message orderMsg = MessageBuilder.withPayload(order).build(); QueueChannel orderSendChannel = context.getBean("senderChannel", QueueChannel.class); orderSendChannel.send(orderMsg); QueueChannel orderRecvChannel = context.getBean("receiverChannel", QueueChannel.class); Message orderRecvMsg = (Message) orderRecvChannel.receive(2000); assertEquals(orderRecvMsg.getPayload().toString(), order.getContent()); assertEquals(orderRecvMsg.getHeaders().get("enriched").toString(), "enriched-Header"); } @Test public void testOrderChainingNested() { OrderPayload order = context.getBean("orderPayload", OrderPayload.class); order.setContent("orderMsg"); Message coffeeMsg = MessageBuilder.withPayload(order).build(); QueueChannel orderSendChannel = context.getBean("senderChannelForNested", QueueChannel.class); orderSendChannel.send(coffeeMsg); QueueChannel orderRecvChannel = context.getBean("receiverChannelForNested", QueueChannel.class); Message orderRecvMsg = (Message) orderRecvChannel.receive(2000); long mainChannelAccessTime = Long.valueOf((String) orderRecvMsg.getHeaders().get("main-channel")).longValue(); long nestedChannelAccessTime = Long.valueOf((String) orderRecvMsg.getHeaders().get("nested-channel")) .longValue(); assertTrue("Main channel should be always called before nested", nestedChannelAccessTime > mainChannelAccessTime); } }
6. Summary
In the above article we discussed how message handler is used as a single endpoint to configure a chain of handlers . We also saw how header modification and enrichment is done using header-enricher
element and filtering using message selector. The most important part of chaining nested handler is using gateway to connect the independent chain handlers.