Apache Camel ActiveMQ Example
In this article, we will see some example of ActiveMQ component.
Apache activeMQ is one of the most popular open source JMS provider.
We will use it to send message to a JMS Queue or Topic and consume messages from a JMS Queue or Topic.
To use this component you need to include activemq jar, camel-spring.jar and camel-jms.jar.
Before we start with our example, Let’s look into the setup details.
This example uses the following frameworks:
- Maven 3.2.3
- Apache Camel 2.15.1
- Spring 4.1.5.RELEASE
- Eclipse as the IDE, version Luna 4.4.1.
1. Dependencies
I will be showing you some examples of camel ActiveMQ so you need to add the following dependencies:
camel-core
– basic module of apache camel.camel-stream
– We will use this to send output to the console.camel-jms
andactivemq-camel
– ActiveMQ JMS components.spring-context
andcamel-spring
– Since we be configuring our camel context in spring.slf4j-api
andslf4j-log4j12
– This is for the log component. It relies onslf4j
for the logger API andlog4j
as the logger implementation
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javacodegeeks.camel</groupId> <artifactId>camelHelloWorld</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-stream</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> </dependencies> </project>
2. ActiveMQ component’s URI format
The activeMQ component’s URI format is:
activemq:[queue:|topic:]destinationName
Where activemq:
is the URI scheme, destinationName is an ActiveMQ queue or topic name. By default, it is interpreted as a queue name.
3. Configuring JMS
The first thing we need to do before we start using activeMQ is to create a connection factory. Below we create a connection factory, to connect to an embedded broker.
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
It is expensive to open up a connection to an ActiveMQ broker so it is recommended to pool the connections. We will see next how to create a pool of connections.
4. Connection Pool
We will create a pooled connection factory to efficiently handle pooling of JMS connections. The PooledConnectionFactory
will create a connection pool with up to 6 connections in use at the same time. Each connection can be shared by many sessions. To make use of PooledConnectionFactory
, you need to include actvemq-pool
to your pom.xml
.
pom.xml:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.1</version> </dependency>
Spring JmsTemplate
opens and closes connections for each send or receive of a message so you need to make sure you point the connectionFactory
to the configured pooledConnectionFactory
.
applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="6" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory"/> </bean> </beans>
5. ActiveMQ Example – Receiving and Sending Message
In this example, we will send heartbeats every second to queue activemq:queue:in
. The message received is re-routed to queue activemq:queue:out
which we finally print it on console.
The first routing creates heartbeats and sends them to the ‘in’ queue. The second route simply logs the message and moves them to ‘out’ queue. The third route picks up messages from ‘out’ queue and prints them on console.
applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="timer:foo?period=1s" /> <transform> <simple>Heartbeat ${date:now:yyyy-MM-dd HH:mm:ss}</simple> </transform> <to uri="activemq:queue:in" /> </route> <route> <from uri="activemq:queue:in" /> <log message="Route message from in queue to out queue with data ${body}" /> <to uri="activemq:queue:out" /> </route> <route> <from uri="activemq:queue:out" /> <log message="Print the message from out to console"/> <to uri="stream:out" /> </route> </camelContext> </beans>
Output:
15:48| INFO | MarkerIgnoringBase.java 95 | Route message from inbox to outbox queue with data Heartbeat 2015-05-28 15:48:45 15:48| INFO | MarkerIgnoringBase.java 95 | Print the message from out to console Heartbeat 2015-05-28 15:48:45
6. Processing Messages Asynchronously
In this example, we will process the messages asynchronously using options asyncConsumer
and concurrentConsumers
. We will set asyncConsumer=true
so that the JmsConsumer processes the Exchange asynchronously. The JmsConsumer may pickup the next message from the JMS queue, while the previous message is being processed asynchronously. Using concurrentConsumers
we can specifies the default number of concurrent consumers.
We post multiple messages to the queue activemq:queue:start
which is then processed asynchronously, the message is passed on to the bean for processing. The object returned from the bean is then printed on the console along with the thread that processed the request.
CamelActiveMqBeanExample:
package com.javacodegeeks.camel; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.jndi.JndiContext; public class CamelActiveMqAsyncExample { public static final void main(String[] args) throws Exception { JndiContext jndiContext = new JndiContext(); jndiContext.bind("testBean", new TestBean()); CamelContext camelContext = new DefaultCamelContext(jndiContext); camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false")); try { camelContext.addRoutes(new RouteBuilder() { public void configure() { from("activemq:queue:start?asyncConsumer=true&concurrentConsumers=10") .to("bean:testBean?method=hello") .to("stream:out"); } }); ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); for (int i = 0; i < 5; i++) { template.sendBody("activemq:queue:start", "body" + i); } Thread.sleep(1000); } finally { camelContext.stop(); } } }
TestBean:
package com.javacodegeeks.camel; public class TestBean { public String hello(String msg) { return msg + ":" + Thread.currentThread(); } }
Since the thread processing the message is different in each message, we can conclude that the messages were processed asynchronously.
Output:
body0:Thread[Camel (camel-1) thread #6 - JmsConsumer[start],5,main] body4:Thread[Camel (camel-1) thread #8 - JmsConsumer[start],5,main] body3:Thread[Camel (camel-1) thread #7 - JmsConsumer[start],5,main] body2:Thread[Camel (camel-1) thread #0 - JmsConsumer[start],5,main] body1:Thread[Camel (camel-1) thread #6 - JmsConsumer[start],5,main]
7. Two Consumers From Same Topic Message
In this example, we have set up two different routes for the same topic endpoint. Any message that the topic is received is sent to both the routes thus both the consumers direct:a
and direct:b
will end up consuming the message. The message received from direct:a
and direct:b
are transformed so we know which endpoint is receiving the message.
TwoConsumersOnSameTopic:
package com.javacodegeeks.camel; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.jndi.JndiContext; public class TwoConsumersOnSameTopic { public static final void main(String[] args) throws Exception { JndiContext jndiContext = new JndiContext(); jndiContext.bind("testBean", new TestBean()); CamelContext camelContext = new DefaultCamelContext(jndiContext); camelContext.addComponent("activemq", ActiveMQComponent .activeMQComponent("vm://localhost?broker.persistent=false")); try { camelContext.addRoutes(new RouteBuilder() { public void configure() { from("activemq:topic:foo").routeId("a").to("log:a", "direct:a"); from("activemq:topic:foo").routeId("b").to("log:b", "direct:b"); from("direct:a").transform( simple("direct:a output: ${body}")) .to("stream:out"); from("direct:b").transform( simple("direct:b output: ${body}")) .to("stream:out"); } }); ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); template.sendBody("activemq:topic:foo", "Topic Message"); Thread.sleep(1000); } finally { camelContext.stop(); } } }
Output:
direct:a output: Topic Message direct:b output: Topic Message
8. JMSReplyTo Example
In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo
header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo
header is set explicitly by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo
destination, it can reply using that destination. In the below example, we should disable reply to, to avoid sending the message back to again to the topic after we have consumed it.
TopicDisableReplyToExample:
package com.javacodegeeks.camel; import javax.jms.ConnectionFactory; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.component.jms.JmsConfiguration; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.jndi.JndiContext; public class TopicDisableReplyToExample { public static final void main(String[] args) throws Exception { JndiContext jndiContext = new JndiContext(); jndiContext.bind("testBean", new TestBean()); CamelContext camelContext = new DefaultCamelContext(jndiContext); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); try { camelContext.addRoutes(new RouteBuilder() { public void configure() { from("activemq:queue:myQueue") .transform(body().prepend("XYZ ")) .to("log:queueResult") .to("direct:queueResult"); from("activemq:topic:myTopic?disableReplyTo=true") .to("log:topicResult") .to("direct:topicResult"); from("direct:queueResult") .transform(simple("direct:queueResult output: ${body}")) .to("stream:out"); from("direct:topicResult") .transform(simple("direct:topicResult output: ${body}")) .to("stream:out"); } }); ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); template.send("activemq:queue:myQueue?preserveMessageQos=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("ABC"); exchange.getIn().setHeader("JMSReplyTo", "topic:myTopic"); } }); Thread.sleep(1000); } finally { camelContext.stop(); } } public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) { JmsConfiguration template = new JmsConfiguration(connectionFactory); template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); return new JmsComponent(template); } }
Output:
17:41| INFO | MarkerIgnoringBase.java 95 | Exchange[ExchangePattern: InOut, BodyType: String, Body: XYZ ABC] direct:queueResult output: XYZ ABC 17:41| INFO | MarkerIgnoringBase.java 95 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: direct:queueResult output: XYZ ABC] direct:topicResult output: direct:queueResult output: XYZ ABC
9. Download the Eclipse Project
This was an example about Apache Camel ActiveMQ Component.
You can download the full source code of this example here: camelActivemqExample.zip