Integration

Spring Integration Control Bus Example

1. Introduction

Control bus is a useful Spring Integration component that accepts messages on the input channel similar to Service ActivatorAdapter  or Transformer but the key difference is that the payload of the message that is received indicates  an invocable action or operation on a bean. The input channel is more of an operation channel that is basically used for sending control messages to invoke management operations on endpoints or other manageable resources.
 
 
 
  

 
Invocation of these operations are not just limited to Spring Integration’s API classes alone, but user can define annotations anywhere in the code and then invoke these annotated methods using messages on control bus. We can specify the command messages to be sent to the control bus as simple SpEL expression and spring integration’s core namespace provides implementation that evaluates the expression.

2. Control messages specification

In order for us to enable control bus, we have to add the following element to our configuration file as below.

<control-bus input-channel="testControlChannel"/>

The message sent to the control bus input channel should contain SpEL expression, referencing the bean and the control operation to be invoked on the bean. The “@” symbol is used to reference the bean. The operation to be invoked on the bean is specified with the operation name after the bean name. In the below example we will see how we can specify a simple control message to increment a sample counter on a managed resource bean.

Message incrementCounterMsg = MessageBuilder.withPayload("@sampleCounter.increment()").build();
testControlChannel.send(incrementCounterMsg);

In the above case the sampleCounter bean has to be declared with annotation as @ManagedResource bean and the increment method has to be declared as @ManagedOperation for the control bus to interpret and invoke the operation.

Control bus can also be used to manage spring integration components. For example we can invoke operation on methods defined on LifeCycle interface even if the @ManagedResource annotation is not present. Adapters can be started or stopped on demand using control message. A simple example is shown as below

Message controlMessage = MessageBuilder.withPayload("@inboundAdapter.stop()").build(); testControlChannel.send(controlMessage);

Here in the above example we are sending a control message, to stop the bean InboundAdapter defined as part of the spring context as below.

<int:inbound-channel-adapter id="inboundAdapter" channel="msgChannelAdapterOutput"  expression="'Sample message'" auto-startup="false">
   <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

Let’s go through an example of configuring a basic application for control bus demonstration.

3. Maven Dependencies

Below pom.xml shows the basic dependencies for configuring control bus. Spring integration core and spring integration jmx are the core dependencies

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.springinteg.controlbus</groupId>
  <artifactId>spring-integration-controlbus</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>spring-integration-controlbus Maven Webapp</name>
  <url>http://maven.apache.org</url>
    <properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<springframework.version>4.3.0.RELEASE</springframework.version>
		<spring.integration.version>4.3.0.RELEASE</spring.integration.version>
	</properties>
  <dependencies>

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>${spring.integration.version}</version>
		</dependency>
		<dependency>
 			 <groupId>org.springframework.integration</groupId>
  			 <artifactId>spring-integration-jmx</artifactId>
  			 <version>${spring.integration.version}</version>
        </dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
			<scope>compile</scope>
			<version>${spring.integration.version}</version>
		</dependency>
		
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<scope>test</scope>
			<version>${spring.integration.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-test</artifactId>
			<scope>test</scope>
			<version>${spring.integration.version}</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>spring-integration-controlbus</finalName>
  </build>
</project>

4. Spring Integration Configuration

The core components that are part of control bus configuration are JMX Managed resource bean, inbound adapter, control-bus component and message channel. To expose the ManagedResourceBean for JMX monitoring and management, we need to export those attributes and operations. This can be done using the tag

<context:mbean-export/>

The detailed context configuration file spring-integ-context.xml with different components shown below.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
		                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
		                http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx.xsd
		                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">


	<context:mbean-export />
	<context:component-scan base-package="com.springinteg.controlbus" />

	<int:channel id="msgChannelAdapterOutput">
		<int:queue />
	</int:channel>

	<int:channel id="controlChannel" />

	<int:control-bus input-channel="controlChannel" />
	<int:inbound-channel-adapter id="inboundAdapter"
		channel="msgChannelAdapterOutput" expression="'This is a test message'"
		auto-startup="false">
		<int:poller fixed-rate="1000" />
	</int:inbound-channel-adapter>
	<bean id="managedCounterBean" class="com.springinteg.controlbus.ManagedCounterBean" />
</beans>

5. Application Configuration

As you have noticed in the above spring context configuration we have defined ManagedCounterBean as a managed resource bean and also a control bus to invoke operations on this managed resource bean. The control bus listens on controlChannel for control messages and then invoke corresponding operations on managed attribute beans. Let’s have a look at the implementation of these classes in detail below.

5.1 Managed Resource Bean

ManagedCounterBean.java

package com.springinteg.controlbus;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource
public class ManagedCounterBean {
	private final AtomicInteger counter = new AtomicInteger();

    @ManagedAttribute
    public int getCounter() {
        return this.counter.get();
    }

    @ManagedAttribute
    public void setCounter(int counter) {
        this.counter.set(counter);
    }
    
    @ManagedOperation
    public void increment() {
        this.counter.incrementAndGet();
    }

}

6. Verification Test Configuration

The below code shows the basic test for verifying the control bus executing command messages sent on the message channel. The first test demonstrates how the command messages can control the operation defined on Spring’s LifecycleInterface. In the first test before sending the control messages we assert that there are no messages in the channel. After sending the control message we make sure that inbound adapter is started and test messages are received by inbound adapter. The second test demonstrates how control messages can be used to invoke operations on a JMX ManagedResourceBean to increment a numeric counter.

ControlBusUnitTest.java

import static org.junit.Assert.*;

import org.apache.log4j.Logger;
import org.junit.Test;

import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;

import com.springinteg.controlbus.ManagedCounterBean;

public class ControlBusUnitTest {

	private static Logger logger = Logger.getLogger(ControlBusUnitTest.class);

	@Test
	public void testControlbusAdapter() {
		ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("spring-integ-context.xml");
		MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class);
		PollableChannel msgChannelAdapterOutput = ac.getBean("msgChannelAdapterOutput", PollableChannel.class);
		Message receivedMsg = (Message) msgChannelAdapterOutput.receive(1000);
		assertNull(receivedMsg);
		logger.info("Message received on channel before adapter started: " + receivedMsg);
		controlChannel.send(new GenericMessage("@inboundAdapter.start()"));
		receivedMsg = (Message) msgChannelAdapterOutput.receive(1000);
		assertNotNull(receivedMsg);
		logger.info("Message received on channel adapter started: " + receivedMsg);
		controlChannel.send(new GenericMessage("@inboundAdapter.stop()"));
		receivedMsg = (Message) msgChannelAdapterOutput.receive(1000);
		assertNull(receivedMsg);
		logger.info("Message received on channel after adapter stopped: " + receivedMsg);
		ac.close();
	}

	@Test
	public void testControlBusMBean() {
		ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("spring-integ-context.xml");
		MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class);
		ManagedCounterBean mangedCounterBean = ac.getBean("managedCounterBean", ManagedCounterBean.class);
		assertEquals(mangedCounterBean.getCounter(), 0);
		logger.info("Value of message counter before sending message to control bus " + mangedCounterBean.getCounter());
		controlChannel.send(new GenericMessage("@managedCounterBean.increment()"));
		assertEquals(mangedCounterBean.getCounter(), 1);
		logger.info("Value of message counter after sending message to control bus " + mangedCounterBean.getCounter());
		ac.close();

	}

}

6.1 Screenshots of Test Execution

6.1.1 Adapter start and stop using command messages

Lifecycle operation control using command messages
Lifecycle operation control using command messages

6.1.2 Managed attribute increment using control messages

Managed Attribute control using command messages
Managed Attribute control using command messages

7. Conclusion

In the above article we have looked at how control bus and command messages can help us in monitoring and management aspects of a messaging application. Monitoring and management is one of the critical aspects of a successful enterprise integration. Control bus can control different aspects of application through JMX operation. Further control messages can be specified using SpEL or Groovy for invoking operations on the target bean.

8. Download the source code

The source code for Spring Integration Control bus is as below.

Download
You can download the full source code of this example here: spring-integration-controlbus

Raghuram Gururajan

Raghuram is a Senior Staff Software Engineer and has more than thirteen years of experience working on web, desktop and mobile applications. He holds a Master's degree in Computer Science from the University of North Carolina . He is a passionate about working on Distributed Systems and specializes in working with Java web services and has significant experience working web applications, databases and continuous integration and deployments. He is currently working as a technical lead at a e-commerce company where he supports development of core API's/RESTFul Services that are part of the Merchandise Product Catalog.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button