Spring Integration Control Bus Example
1. Introduction
Control bus is a useful Spring Integration component that accepts messages on the input channel similar to Service Activator
, Adapter
or Transformer
but the key difference is that the payload of the message that is received indicates an invocable action or operation on a bean. The input channel is more of an operation channel that is basically used for sending control messages to invoke management operations on endpoints or other manageable resources.
Invocation of these operations are not just limited to Spring Integration’s API classes alone, but user can define annotations anywhere in the code and then invoke these annotated methods using messages on control bus. We can specify the command messages to be sent to the control bus as simple SpEL
expression and spring integration’s core namespace provides implementation that evaluates the expression.
2. Control messages specification
In order for us to enable control bus, we have to add the following element to our configuration file as below.
<control-bus input-channel="testControlChannel"/>
The message sent to the control bus input channel should contain SpEL
expression, referencing the bean and the control operation to be invoked on the bean. The “@” symbol is used to reference the bean. The operation to be invoked on the bean is specified with the operation name after the bean name. In the below example we will see how we can specify a simple control message to increment a sample counter on a managed resource bean.
Message incrementCounterMsg = MessageBuilder.withPayload("@sampleCounter.increment()").build(); testControlChannel.send(incrementCounterMsg);
In the above case the sampleCounter
bean has to be declared with annotation as @ManagedResource
bean and the increment method has to be declared as @ManagedOperation
for the control bus to interpret and invoke the operation.
Control bus can also be used to manage spring integration components. For example we can invoke operation on methods defined on LifeCycle
interface even if the @ManagedResource
annotation is not present. Adapters can be started or stopped on demand using control message. A simple example is shown as below
Message controlMessage = MessageBuilder.withPayload("@inboundAdapter.stop()").build(); testControlChannel.send(controlMessage);
Here in the above example we are sending a control message, to stop the bean InboundAdapter
defined as part of the spring context as below.
<int:inbound-channel-adapter id="inboundAdapter" channel="msgChannelAdapterOutput" expression="'Sample message'" auto-startup="false"> <int:poller fixed-rate="1000"/> </int:inbound-channel-adapter>
Let’s go through an example of configuring a basic application for control bus demonstration.
3. Maven Dependencies
Below pom.xml
shows the basic dependencies for configuring control bus. Spring integration core and spring integration jmx are the core dependencies
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springinteg.controlbus</groupId> <artifactId>spring-integration-controlbus</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>spring-integration-controlbus Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <springframework.version>4.3.0.RELEASE</springframework.version> <spring.integration.version>4.3.0.RELEASE</spring.integration.version> </properties> <dependencies> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jmx</artifactId> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <scope>compile</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> <version>${spring.integration.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> </dependencies> <build> <finalName>spring-integration-controlbus</finalName> </build> </project>
4. Spring Integration Configuration
The core components that are part of control bus configuration are JMX Managed resource bean, inbound adapter, control-bus component and message channel. To expose the ManagedResourceBean
for JMX monitoring and management, we need to export those attributes and operations. This can be done using the tag
<context:mbean-export/>
The detailed context configuration file spring-integ-context.xml
with different components shown below.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:mbean-export /> <context:component-scan base-package="com.springinteg.controlbus" /> <int:channel id="msgChannelAdapterOutput"> <int:queue /> </int:channel> <int:channel id="controlChannel" /> <int:control-bus input-channel="controlChannel" /> <int:inbound-channel-adapter id="inboundAdapter" channel="msgChannelAdapterOutput" expression="'This is a test message'" auto-startup="false"> <int:poller fixed-rate="1000" /> </int:inbound-channel-adapter> <bean id="managedCounterBean" class="com.springinteg.controlbus.ManagedCounterBean" /> </beans>
5. Application Configuration
As you have noticed in the above spring context configuration we have defined ManagedCounterBean
as a managed resource bean and also a control bus to invoke operations on this managed resource bean. The control bus listens on controlChannel
for control messages and then invoke corresponding operations on managed attribute beans. Let’s have a look at the implementation of these classes in detail below.
5.1 Managed Resource Bean
ManagedCounterBean.java
package com.springinteg.controlbus; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; @ManagedResource public class ManagedCounterBean { private final AtomicInteger counter = new AtomicInteger(); @ManagedAttribute public int getCounter() { return this.counter.get(); } @ManagedAttribute public void setCounter(int counter) { this.counter.set(counter); } @ManagedOperation public void increment() { this.counter.incrementAndGet(); } }
6. Verification Test Configuration
The below code shows the basic test for verifying the control bus executing command messages sent on the message channel. The first test demonstrates how the command messages can control the operation defined on Spring’s LifecycleInterface
. In the first test before sending the control messages we assert that there are no messages in the channel. After sending the control message we make sure that inbound adapter is started and test messages are received by inbound adapter. The second test demonstrates how control messages can be used to invoke operations on a JMX ManagedResourceBean
to increment a numeric counter.
ControlBusUnitTest.java
import static org.junit.Assert.*; import org.apache.log4j.Logger; import org.junit.Test; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; import com.springinteg.controlbus.ManagedCounterBean; public class ControlBusUnitTest { private static Logger logger = Logger.getLogger(ControlBusUnitTest.class); @Test public void testControlbusAdapter() { ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("spring-integ-context.xml"); MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class); PollableChannel msgChannelAdapterOutput = ac.getBean("msgChannelAdapterOutput", PollableChannel.class); Message receivedMsg = (Message) msgChannelAdapterOutput.receive(1000); assertNull(receivedMsg); logger.info("Message received on channel before adapter started: " + receivedMsg); controlChannel.send(new GenericMessage("@inboundAdapter.start()")); receivedMsg = (Message) msgChannelAdapterOutput.receive(1000); assertNotNull(receivedMsg); logger.info("Message received on channel adapter started: " + receivedMsg); controlChannel.send(new GenericMessage("@inboundAdapter.stop()")); receivedMsg = (Message) msgChannelAdapterOutput.receive(1000); assertNull(receivedMsg); logger.info("Message received on channel after adapter stopped: " + receivedMsg); ac.close(); } @Test public void testControlBusMBean() { ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("spring-integ-context.xml"); MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class); ManagedCounterBean mangedCounterBean = ac.getBean("managedCounterBean", ManagedCounterBean.class); assertEquals(mangedCounterBean.getCounter(), 0); logger.info("Value of message counter before sending message to control bus " + mangedCounterBean.getCounter()); controlChannel.send(new GenericMessage("@managedCounterBean.increment()")); assertEquals(mangedCounterBean.getCounter(), 1); logger.info("Value of message counter after sending message to control bus " + mangedCounterBean.getCounter()); ac.close(); } }
6.1 Screenshots of Test Execution
6.1.1 Adapter start and stop using command messages
6.1.2 Managed attribute increment using control messages
7. Conclusion
In the above article we have looked at how control bus and command messages can help us in monitoring and management aspects of a messaging application. Monitoring and management is one of the critical aspects of a successful enterprise integration. Control bus can control different aspects of application through JMX operation. Further control messages can be specified using SpEL
or Groovy
for invoking operations on the target bean.
8. Download the source code
The source code for Spring Integration Control bus is as below.