Apache Camel AMQP Example
1. Introduction
In this article, we will see an example of AMQP implementation of Apache ActiveMQ using Apache Camel.
2. What is AMQP?
AMQP is an open standard application layer protocol for message-oriented middleware. The most important feature is that AMQP is a wire-level protocol and is interoperable by design. Also, the AMQP standard is by design more flexible and powerful (e.g., supports two-way communication by design) – they simply learned from JMS mistakes. AMQP has the implementations as below:
- Apache Qpid, an open-source project at the Apache Foundation
- Apache ActiveMQ, an open-source project at the Apache Foundation
- RabbitMQ, an open-source project sponsored by Pivotal
3. What is Apache’s ActiveMQ component?
The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on JMS Component and uses Spring’s JMS support for declarative transactions, using Spring’s JmsTemplate
for sending and a MessageListenerContainer
for consuming. All the options from the JMS component also applies for this component.
To use this component make sure you have the activemq.jar
or activemq-core.jar
on your classpath along with any Camel dependencies such as camel-core.jar
, camel-spring.jar
and camel-jms.jar
.
The activeMQ component’s URI format is:
activemq:[queue:|topic:]destinationName
4. Technology Stack
In this example we will be using following technology stack:
- Maven 4.0 – Build and dependency tool. You can visit here for more details
- Apache Camel 2.15.1 – Open-source integration framework based on known Enterprise Integration Patterns.
- Spring 4.1.5.RELEASE – Comprehensive programming and configuration model for modern Java-based enterprise applications
- Spring Tool Suite (STS) – An Eclipse-based development environment that is customized for developing Spring applications.
- ActiveMQ Binary Distributions – The latest stable release is the ActiveMQ 5.14.5 Release. See the Download Archives for all time releases.
5.Apache Camel AMQP
5.1 Dependencies
To continue using our example, we need to add the dependent jar files to the classpath. This can be achieved either by deploying directly the jar file or using the Maven. Since we are using Maven for our example we will be using the pom.xml for the dependency of the following jars:
- camel-core
- camel-stream
- camel-jms
- activemq-camel
- camel-spring
- spring-context
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.springframework.samples</groupId> <artifactId>AMQP</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-stream</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> </dependencies> </project>
5.2 XML Request
The below XML request is used to send as an input message to a JMS Queue or Topic and consume messages from a JMS Queue or Topic.
order.xml
<?xml version="1.0" encoding="UTF-8"?> <orders> <order product="electronics"> <items> <item>Laptop</item> <item>Mobile</item> </items> </order> <order product="books"> <items> <item>Design Patterns</item> <item>XML</item> </items> </order> </orders>
5.3 Application Context File
The first thing we need to do before we start using activeMQ is to create a connection factory. It is expensive to open up a connection to an ActiveMQ broker so it is recommended to pool the connections.
We will create a pooled connection factory to efficiently handle pooling of JMS connections. The PooledConnectionFactory
will create a connection pool with up to 6 connections in use at the same time. Each connection can be shared by many sessions. To make use of PooledConnectionFactory
, you need to include actvemq-pool
to your pom.xml
.
Spring JmsTemplate
opens and closes connections for each send or receive of a message so you need to make sure you point the connectionFactory
to the configured pooledConnectionFactory
.
The first route consume the xml request message produced by tempate of camel context, and then transfer to the Apache ActiveMQ Q1 queue. The second route consume the request message from the Q1 queue, and then stream out to the STS console screen.
ActiveMQContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616/" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <log message=" Transferring"/> <to uri="activemq:queue:Q1"/> <log message=" Transfered "/> </route> <route> <from uri="activemq:queue:Q1" /> <log message="Print the message from out to console"/> <to uri="stream:out" /> </route> </camelContext> </beans>
5.4 Main Java Class
In this Java class, firstly, we create the instance of the ApplicationContext based for ActiveMQContext.xml file. Then start the Camel context so that we can use the route java class. For our example, we have used the createProducerTemplate method of the created camel context’s instance, so that we can send the data to the route for processing.
At last, we stop the instance of the Camel context.
ActiveMQ.java
package com; import java.io.FileInputStream; import java.io.InputStream; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.spring.SpringCamelContext; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ActiveMQ { public static void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext( "ActiveMQContext.xml"); CamelContext camelContext = SpringCamelContext.springCamelContext( appContext, false); try { camelContext.start(); ProducerTemplate template = camelContext.createProducerTemplate(); InputStream orderxml = new FileInputStream("src/main/resources/order.xml"); template.sendBody("direct:start", orderxml); } finally { camelContext.stop(); } } }
5.3 Console Output
09:59:37 INFO support.ClassPathXmlApplicationContext: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@39a054a5: startup date root of context hierarchy 09:59:37 INFO xml.XmlBeanDefinitionReader: Loading XML bean definitions from class path resource [ActiveMQContext.xml] 09:59:39 WARN tcp.TcpTransportFactory: path isn't a valid local location for TcpTransport to use 09:59:40 INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) is starting INFO management.ManagedManagementStrategy: JMX is enabled INFO converter.DefaultTypeConverter: Loaded 186 type converters INFO spring.SpringCamelContext: AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance. INFO spring.SpringCamelContext: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html INFO spring.SpringCamelContext: Route: route1 started and consuming from: Endpoint[direct://start] WARN tcp.TcpTransportFactory: path isn't a valid local location for TcpTransport to use INFO spring.SpringCamelContext: Route: route2 started and consuming from: Endpoint[activemq://queue:Q1] INFO spring.SpringCamelContext: Total 2 routes, of which 2 is started. INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) started in 0.706 seconds INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) is starting INFO spring.SpringCamelContext: Total 2 routes, of which 2 is started. INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) started in 0.000 seconds INFO route1: Transferring WARN tcp.TcpTransportFactory: path isn't a valid local location for TcpTransport to use INFO route1: Transfered INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) is shutting down INFO impl.DefaultShutdownStrategy: Starting to graceful shutdown 2 routes (timeout 300 seconds) INFO route2: Print the message from out to console <?xml version="1.0" encoding="UTF-8"?> <orders> <order product="electronics"> <items> <item>Laptop</item> <item>Mobile</item> </items> </order> <order product="books"> <items> <item>Design Patterns</item> <item>XML</item> </items> </order> </orders> INFO impl.DefaultShutdownStrategy: Route: route2 shutdown complete, was consuming from: Endpoint[activemq://queue:Q1] INFO impl.DefaultShutdownStrategy: Route: route1 shutdown complete, was consuming from: Endpoint[direct://start] INFO impl.DefaultShutdownStrategy: Graceful shutdown of 2 routes completed in 0 seconds INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) uptime 0.192 seconds INFO spring.SpringCamelContext: Apache Camel 2.15.1 (CamelContext: camel-1) is shutdown in 0.020 seconds
6. Conclusion
Here in Apache Camel AMQP Example, we have learned about to send a message to a JMS Queue or Topic and consume messages from a JMS Queue or Topic.
So, now you are ready to implement the AMQP in Apache camel applications with the help of Apache ActiveMQ.
7. Download the Spring Tool Suite Project
You can download the full source code of this example here: AMQP.zip