Spring Boot Integration ActiveMQ Example
1. Introduction
In this article we will be discussing the integration of Spring Boot with ActiveMQ. We will be using ActiveMQ as a message broker and create a sample application with JMS Queue and channel adapters integrated with Spring Boot to implement asynchronous messaging service. Before we get started I would like to go over some of the basic components as below.
1.1 JMS
Java Messaging Service(JMS) is an application programming interface that is used for exchange of loosely coupled, reliable and asynchronous messages between different software application components (clients).
1.2 ActiveMQ
ActiveMQ is an open-source, message oriented middleware written in Java with a full fledged implementation of JMS 1.1 as part of J2EE 1.4 specification ( as per ActiveMQ website). It provides messaging software with enterprise features that can serve as a backbone for a distributed application built upon messaging model.
1.3 Queues and Topics
JMS messages sent by an application are targeted to a specific destination just like postal mail boxes and the messages are placed in the mailbox until someone picks them up. There are two types of destination in JMS: queues and topics.
1.4 Queue
Queue’s are based on point-to-point messaging model (messages are sent to queue) or p2p model which allows users to send messages both synchronously or asynchronously using different channels.
1.5 Topic
Topics are based on publish-subscribe model where messages are sent to a particular topic. Publish/Subscribe messaging model allows publishers to send messages to many clients/users at the same time. Consumers can subscribe to a topic and when a message arrives each consumer gets a copy of that message.
1.6 Spring Integration
Spring Integration provides a number of different channel adapters that acts as a transparent bridge between Spring Integration messaging and JMS messaging. Spring Integration takes care of all boilerplate code and the user can configure the application messaging code with just a few lines of XML without any knowledge of underlying mechanism used to communicate. We can change the communication mechanism as needed with just a few changes to configuration files.
2. Maven dependencies
Since we are using Spring Boot all the basic dependencies for spring integration are automatically inherited. Also you can notice below that we have not added any version for the dependencies since they are are automatically inherited from the parent spring-boot-starter-parent pom.
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.2.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> </dependency> </dependencies>
3. Configuration of Spring Boot with JMS/ActiveMQ
As we discussed above most of the configuration for channel adapters and messaging can be done through configuration. We need to have a separate XML configuration for defining JMS channel adapter and destination queue.
The integration channel adapter helloWorldJMSAdapter acts like a bridge between the spring integration channel and the destination Queue by delivering messages sent to the destination queue to spring integration channel and also any messages sent to the spring integration channel back to the destination Queue.
Also you can notice that the destination queue has to be set to the name of a valid ActiveMQ bean that needs to be implemented as part of your configuration. We have also defined a service activator as part of the configuration below and spring integration uses the service activator bean to process the messages from the integration channel.
springintegration-config.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms" xmlns:integration="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"> <jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue" connection-factory="jmsConnectionFactory" channel="helloWorldChannel"/> <integration:channel id="helloWorldChannel"/> <integration:service-activator id="helloWorldServiceActivator" input-channel="helloWorldChannel" ref="helloWorldAmqService" method="processMsg"/> </beans>
3.1 ActiveMQ configuration
For the JMS channel adapter defined above we are going to implement and setup the destination queue that we have declared above as helloWorldJMSQueue.
HelloWorldAmqConfig.java
package com.samplejmsapp.activemq.configuration; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; @Configuration public class HelloWorldAmqConfig { public static final String HELLO_WORLD_QUEUE = "hello.world.queue"; @Bean public Queue helloWorldJMSQueue() { return new ActiveMQQueue(HELLO_WORLD_QUEUE); } }
3.2 Broker Configuration
We need to configure the broker with the broker-url,username and password . This configuration is needed for successful initialization of ActiveMQ broker by Spring Boot. Before configuring the below properties we should also download and install ActiveMQ from http://activemq.apache.org/download.html and start the broker using the command
$ACTIVEMQ_HOME/bin/activemq start
spring.activemq.pooled=false spring.activemq.broker-url=failover://tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin
3.3 Service configuration
We will now define the basic service component class that takes in the input message and process the same in processMsg method and displays the output to console.
HelloWorldAmqService.java
package com.samplejmsapp.activemq.services; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.springframework.stereotype.Service; @Service public class HelloWorldAmqService { public void processMsg(String msg) { DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); Date date = new Date(); System.out.println("*************"+ msg + " as of "+sdf.format(date)+" *********** " ); } }
3.4 Application configuration
The above configuration for spring channel adapters and queue’s etc that we have defined in a XML configuration file, we need a way to integrate that with Spring Boot by implementing a SpringBootApplication class as below.
HelloWorldAmqApp.java
package com.samplejmsapp.activemq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ImportResource; @SpringBootApplication @ImportResource("classpath*:/springintegration-config.xml") public class HelloWorldAmqApp { public static void main(String[] args) { SpringApplication.run(HelloWorldAmqApp.class, args); } }
4. Spring Integration test for sending/receiving messages
In the below integration test we will autowire the connection factory and initialize the queue, producer and send a message and we will verify if service activator received the same by asserting the output for console using OutputCapture feature of Spring Boot .
SampleJmsAppTest.java
package com.samplejmsapp.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.junit4.SpringRunner; import com.samplejmsapp.activemq.configuration.HelloWorldAmqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.rule.OutputCapture; import junit.framework.TestCase; @RunWith(SpringRunner.class) @SpringBootTest public class SampleJmsAppTest extends TestCase { @Rule public OutputCapture outputCapture = new OutputCapture(); @Autowired @Qualifier("jmsConnectionFactory") ConnectionFactory jmsConnectionFactory; String queueName = HelloWorldAmqConfig.HELLO_WORLD_QUEUE; MessageProducer jmsamqproducer; Destination jmsamqdestination; Session jmsamqsession; Connection jmsamqconn; @Before public void setUpJmsSession() throws JMSException { jmsamqconn = jmsConnectionFactory.createConnection(); jmsamqconn.start(); jmsamqsession = jmsamqconn.createSession(false, Session.AUTO_ACKNOWLEDGE); jmsamqdestination = jmsamqsession.createQueue(queueName); jmsamqproducer = jmsamqsession.createProducer(jmsamqdestination); jmsamqproducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } @After public void tearDownJmsSession() throws JMSException { jmsamqsession.close(); jmsamqconn.close(); } @Test public void testSendMsgToConsumer() { try { TextMessage msg = jmsamqsession.createTextMessage("This is message from producer"); jmsamqproducer.send(jmsamqdestination, msg); Thread.sleep(3000L); assertTrue(this.outputCapture.toString().contains("This is message from producer")); } catch (JMSException e) { fail(); } catch (InterruptedException e) { fail(); } } }
5. Verify messages
To Verify the messages we need to start the ActiveMQ server and then run our test SampleJmsAppTest to successfully send the message and verify the same in ActiveMQ console. We can see the same as part of successful run as below.
6. Conclusion
In this Spring Integration example we have shown how simple and straightforward it is to integrate JMS ActiveMQ messaging with Spring Boot. Spring Integration takes care of all the boiler plate code and with a few changes to XML configuration files we can fine tune the application without changing the application code, recompiling or repackaging. Spring integration also provides features like “claim check” that provides configurable storage for message payloads that we can explore in future posts.
7. Download The Source Code
You can download the full source code of this example here: Spring Boot Integration ActiveMQ Example
This test load actual application context.
Oh my god, try catch in integration tests and thread sleep? Hello, 2023 here.