Apache ActiveMQ Best Practices Tutorial
Apache ActiveMQ is an open source messaging server written in Java which implements JMS 1.1 specifications. In this tutorial, you will learn how to develop a few Java applications which integrate ActiveMQ to send and receive messages to and from destinations. If you already know how to install and configure ActiveMQ, you can skip the first four chapters.
Table Of Contents
- 1. Introduction
- 2. Install an Apache ActiveMQ Server
- 3. Start the Apache ActiveMQ Server
- 4. Monitor the Apache ActiveMQ Server
- 5. Business Use Cases
- 6. Define JMS Message
- 7. Apache ActiveMQ Java Client Library
- 8. Publish Message Application
- 9. Consume Message Application
- 10. Integration with Spring JMS
- 11. Integrating with Tomcat
- 12. Common Problems
- 13. Summary
- 14. References
- 15. Download the Source Code
1. Introduction
Apache ActiveMQ (AMQ) is JMS 1.1 implementation from the Apache Software Foundation.
AMQ is a message broker which translates the messages from the sender to the receiver. Message brokers are the building blocks of message-oriented middleware (MOM) architecture.
AMQ is one of the best open source messaging and Integration Patterns server. It provides a communication between applications, as well as fulfills both notification and inter-operation needs among the applications.
2. Install an Apache ActiveMQ Server
Most of business applications treat the AMQ as an infrastructure resource. We will install an AMQ server as a standalone server in this tutorial. Follow these instructions, we installed the AMQ 5.15.0.
3. Start the Apache ActiveMQ Server
Navigate to \apache-activemq-5.15.0\bin\win64
directory and click on the activemq.bat
to start the server.
The output below demonstrates that the server started successfully.
server.log
jvm 1 | INFO | Apache ActiveMQ 5.15.0 (localhost, ID:SL2LS431841-57319-1512184574307-0:1) started jvm 1 | INFO | For help or more information please see: http://activemq.apache.org
4. Monitor the Apache ActiveMQ Server
AMQ provides a web console application to monitor and administrate. After the AMQ server starts, follow the steps below to launch the web console.
- Open a Browser: Chrome, IE, Firefox, etc
- Enter the URL:
localhost:8161/admin/index.php
- Enter admin/admin as username/password
Here you should see the “Welcome” page. Users can send, read, and delete messages via the web console.
5. Business Use Cases
Company X provides services to customers. Each new customer will be set up at billing and support systems.
In this tutorial, we will demonstrate how to build customer on-boarding process, billing system, support application, and integrate them via AMQ:
OnBoardNewCustomerApp
which sets up new customers and sends the new customer events to ActiveMQ customer topicConfigBillingForNewCustomerApp
which listens to the new customer events from the virtual topic and configures it into the billing applicationConfigSupportForNewCustomerApp
which listens to the new customer events from the virtual topic and configures it into the support application
6. Define JMS Message
6.1 Message Destination
For this business use case, both billing and support systems get notified when new customer joins. We choose the publish/subscribe message pattern to build the OnBoardNewCustomerApp
which publishes the customer event to AMQ broker topic: VirtualTopic.Customer.Topic
.
There are three special characters reserved by AMQ when naming the destination:
.
is used to separate names in a path*
is used to match any name in a path>
is used to recursively match any destination starting from this name
6.2 Message Header
The message header provides meta data about the message used by both clients and the AMQ brokers. There are sets of pre-defined JMS message header. Giving two examples below:
JMSXGroupID
: utilize this if you want some group of message to always go to same consumerJMXCorrelationId
: use this to link the message together
6.3 Message Body
The message body is the actual message that integrates the applications together. For this example, the message is Json format of the CustomerEvent
.
CustomerEvent
package jcg.demo.model; public class CustomerEvent { private String type; private Integer customerId; public CustomerEvent(String type, Integer customerId) { this.type = type; this.customerId = customerId; } public String getType() { return type; } public Integer getCustomerId() { return customerId; } public String toString() { return "CustomerEvent: type(" + type + "), customerId(" + customerId + ")"; } public String getCustomerDetailUri() { return "https://localhost:8080/support/customer/" + customerId; } }
6.4 Configure Virtual Topic
AMQ server installation comes with a ready to use configuration file. Modify the activemq.xml
to add below to allow AMQ Broker forwards the messages from any topic named as VirtualTopic.*.Topic
to any virtutal topic destination with name starts as Consumer.*.
activemq.xml
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/> <virtualTopic name="JCG.>" prefix="VTC.*." selectorAware="true"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
- line 4: Configure Virtual Topic to disable
selectorAware
- line 4: Configure Virtual Topic to enable
selectorAware
Restart the AMQ server after the configuration file updates.
7. Apache ActiveMQ Java Client Library
Add ActiveMQ Java library to the project pom.xml.
pom.xml
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency>
8. Publish Message Application
In this example, you will see how to create ActiveMQMessgeProducer
to send the messages.
8.1 ActiveMQMessgeProducer
A Java class wraps the ActiveMQ Java API to send the messages.
ActiveMQMessgeProducer
package jcg.demo.activemq; import java.util.Random; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import com.google.gson.Gson; import jcg.demo.jms.util.DataUtil; /** * A simple message producer which sends the message to the ActiveMQ Broker. * * @author Mary.Zheng * */ public class ActiveMQMessageProducer { private static final String ACTION_ID_HEADER = "actionId"; private static final String ACTION_HEADER = "action"; private ConnectionFactory connFactory; private Connection connection; private Session session; private Destination destination; // https://docs.oracle.com/javaee/7/api/javax/jms/MessageProducer.html private MessageProducer msgProducer; private String activeMqBrokerUri; private String username; private String password; public ActiveMQMessageProducer(final String activeMqBrokerUri, final String username, final String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } public void setup(final boolean transacted, final boolean isDestinationTopic, final String destinationName) throws JMSException { setConnectionFactory(activeMqBrokerUri, username, password); setConnection(); setSession(transacted); setDdestination(isDestinationTopic, destinationName); setMsgProducer(); } public void close() throws JMSException { if (msgProducer != null) { msgProducer.close(); msgProducer = null; } if (session != null) { session.close(); session = null; } if (connection != null) { connection.close(); connection = null; } } public void commit(final boolean transacted) throws JMSException { if (transacted) { session.commit(); } } public void sendMessage(final String actionVal) throws JMSException { TextMessage textMessage = buildTextMessageWithProperty(actionVal); msgProducer.send(destination, textMessage); // msgProducer.send(textMessage, DeliveryMode.NON_PERSISTENT, 0, 0); } private TextMessage buildTextMessageWithProperty(final String action) throws JMSException { Gson gson = new Gson(); String eventMsg = gson.toJson(DataUtil.buildDummyCustomerEvent()); TextMessage textMessage = session.createTextMessage(eventMsg); Random rand = new Random(); int value = rand.nextInt(100); textMessage.setStringProperty(ACTION_HEADER, action); textMessage.setStringProperty(ACTION_ID_HEADER, String.valueOf(value)); return textMessage; } private void setDdestination(final boolean isDestinationTopic, final String destinationName) throws JMSException { if (isDestinationTopic) { destination = session.createTopic(destinationName); } else { destination = session.createQueue(destinationName); } } private void setMsgProducer() throws JMSException { msgProducer = session.createProducer(destination); } private void setSession(final boolean transacted) throws JMSException { // transacted=true for better performance to push message in batch mode session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); } private void setConnection() throws JMSException { connection = connFactory.createConnection(); connection.start(); } private void setConnectionFactory(final String activeMqBrokerUri, final String username, final String password) { connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); ((ActiveMQConnectionFactory) connFactory).setUseAsyncSend(true); RedeliveryPolicy policy = ((ActiveMQConnectionFactory) connFactory).getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2); } }
- line 51-55: Wire connection, session with correct order. Spring JMS Dependency Injection takes care of it for you.
- line 58-73: Close connection. Spring JMS takes care of it for you
- line 84: Define the durability of message. All message are durable by default. We can turn off to get better performance
8.2 ActiveMQMessgeProducerTest
This Junit test sends the messages to various destinations. This is my convenient way to send the message to the destination.
ActiveMQMessgeProducerTest
package jcg.demo.activemq; import javax.jms.JMSException; import org.junit.After; import org.junit.Before; import org.junit.Test; import jcg.demo.jms.util.DataUtil; public class ActiveMQMessageProducerTest { private ActiveMQMessageProducer msgQueueSender; @Before public void setup() { msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin"); } @After public void cleanup() throws JMSException { msgQueueSender.close(); } @Test public void send_msg_to_no_transaction_Queue() throws JMSException { msgQueueSender.setup(false, false, DataUtil.TEST_GROUP1_QUEUE_1); msgQueueSender.sendMessage("JCG"); } @Test public void send_msg_to_Group2_Queue1() throws JMSException { msgQueueSender.setup(false, false, DataUtil.TEST_GROUP2_QUEUE_1); msgQueueSender.sendMessage("JCG"); } @Test public void send_msg_to_transaction_Group1_Queue2() throws JMSException { msgQueueSender.setup(true, false, DataUtil.TEST_GROUP1_QUEUE_2); msgQueueSender.sendMessage("DEMO"); msgQueueSender.commit(true); } @Test public void send_msg_to_no_transaction_Group1_Topic() throws JMSException { msgQueueSender.setup(false, true, DataUtil.TEST_GROUP1_TOPIC); msgQueueSender.sendMessage("MZHENG"); } @Test public void send_msg_to_Virtual_Topic() throws JMSException { msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC); msgQueueSender.sendMessage("MZHENG"); } @Test public void send_msg_to_Virtual_Topic_WithSelector() throws JMSException { msgQueueSender.setup(false, true, DataUtil.TEST_VTC_TOPIC_SELECTOR); msgQueueSender.sendMessage("DZONE"); } }
- line 27-28: Send to queue
test.group1.queue1
- line 33-34: Send to queue
test.group2.queue1
- line 39-41: Send to queue
test.group1.queue2
- line 46-47: Send to normal topic
test.group1.topic
- line 52-53: Send to selector unaware topic
VirtualTopic.Customer.Topic
- line 58-59: Send to selector aware topic
JCG.Mary.Topic
8.3 Execution Output
We ran the ActiveMQMessgeProducerTest
to send message to three queues and three topics. You can verified by viewing the AMQ web console. There are one pending messages in each of three queues:test.group1.queue1
, test.group1.queue2
, and test.group2.queue1
.
There is one messages in each of three topics: JCG.Mary.Topic
, test.group1.topic
and VirtualTopic.Customer.Topic
.
8.4 OnBoardNewCustomerApp
OnBoardNewCustomerApp
sends the new customer message to the VirtualTopic.Customer.Topic
.
OnBoardNewCustomerApp
package jcg.demo.activemq.app; import jcg.demo.activemq.ActiveMQMessageProducer; import jcg.demo.jms.util.DataUtil; public class OnBoardNewCustomerApp { public static void main(String[] args) { ActiveMQMessageProducer msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin"); try { msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC); msgQueueSender.sendMessage("CUSTOMER"); } catch (Exception e) { e.printStackTrace(); } } }
Execute OnBoardNewCustomerApp
sends a customer message to the VirtualTopic.Customer.Topic
. However, since there is no consumer yet, so AMQ Broker will not send any message to the virtual topic queue yet.
9. Consume Message Application
9.1 ActiveMQMessageConsumer
A message consumer utilitizes AMQ java API.
ActiveMQMessgeConsumer
package jcg.demo.activemq; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * A simple message consumer which consumes the message from ActiveMQ Broker. * * @author Mary.Zheng * */ public class ActiveMQMessageConsumer implements MessageListener { private String activeMqBrokerUri; private String username; private String password; private boolean isDestinationTopic; private String destinationName; private String selector; private String clientId; public ActiveMQMessageConsumer(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); if (clientId != null) { factory.setClientID(clientId); } Connection connection = factory.createConnection(); if (clientId != null) { connection.setClientID(clientId); } Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); setComsumer(session); connection.start(); System.out.println(Thread.currentThread().getName() + ": ActiveMQMessageConsumer Waiting for messages at " + destinationName); } private void setComsumer(Session session) throws JMSException { MessageConsumer consumer = null; if (isDestinationTopic) { Topic topic = session.createTopic(destinationName); if (selector == null) { consumer = session.createConsumer(topic); } else { consumer = session.createConsumer(topic, selector); } } else { Destination destination = session.createQueue(destinationName); if (selector == null) { consumer = session.createConsumer(destination); } else { consumer = session.createConsumer(destination, selector); } } consumer.setMessageListener(this); } @Override public void onMessage(Message message) { String msg; try { msg = String.format( "[%s]: ActiveMQMessageConsumer Received message from [ %s] - Headers: [ %s] Message: [ %s ]", Thread.currentThread().getName(), destinationName, getPropertyNames(message), ((TextMessage) message).getText()); System.out.println(msg); } catch (JMSException e) { e.printStackTrace(); } } private String getPropertyNames(Message message) throws JMSException { String props = ""; @SuppressWarnings("unchecked") Enumeration properties = message.getPropertyNames(); while (properties.hasMoreElements()) { String propKey = properties.nextElement(); props += propKey + "=" + message.getStringProperty(propKey) + " "; } return props; } public void setSelector(String selector) { this.selector = selector; } public boolean isDestinationTopic() { return isDestinationTopic; } public String getDestinationName() { return destinationName; } public String getSelector() { return selector; } public String getClientId() { return clientId; } public void setDestinationTopic(boolean isDestinationTopic) { this.isDestinationTopic = isDestinationTopic; } public void setDestinationName(String destinationName) { this.destinationName = destinationName; } public void setClientId(String clientId) { this.clientId = clientId; } }
- line 23: Create
ActiveMQMessageConsumer
by implementingjavax.jms.MessageListener
- line 44: Set connection
clientID
- line 62: Create a topic
- line 65: Create message consumer from a topic without selector
- line 67: Create message consumer from a topic with selector
- line 70: Create a queue
- line 73: Create message consumer from a queue without selector
- line 75: Create message consumer from a queue with selector
- line 79: Register message listener
- line 83:
Override
theonMessage
9.2 ActiveMQMessageConsumerMainApp
Create ActiveMQMessageConsumerMainApp
to consume from various destinations.
ActiveMQMessageConsumerMainApp
package jcg.demo.activemq.app; import javax.jms.JMSException; import jcg.demo.activemq.ActiveMQMessageConsumer; import jcg.demo.jms.util.DataUtil; public class ActiveMQMessageConsumerMainApp { public static void main(String[] args) { consumeCustomerVTCQueue(); consumerVTCQueueWithSelector(); consumeGroup1Topic(); consumeAllGroup2(); consume_queue_with_prefetchsize(); } private static void consumeCustomerVTCQueue() { // the message in the topic before this subscriber starts will not be // picked up. ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName("Consumer.zheng." + DataUtil.CUSTOMER_VTC_TOPIC); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static void consumerVTCQueueWithSelector() { ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName("VTC.DZONE." + DataUtil.TEST_VTC_TOPIC_SELECTOR); queueMsgListener.setSelector("action='DZONE'"); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static void consumeGroup1Topic() { ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_TOPIC); queueMsgListener.setDestinationTopic(true); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static void consumeAllGroup2() { ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName("*.group2.*"); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static void exclusive_queue_Consumer() { ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName(DataUtil.TEST_GROUP2_QUEUE_2 + "?consumer.exclusive=true"); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static void consume_queue_with_prefetchsize() { ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin", "admin"); queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_QUEUE_2 + "?consumer.prefetchSize=10"); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } }
- line 25: Consume from virtual topic queue
Consumer.zheng.VirtualTopic.Customer.Topic
- line 38-39: Consume from virtual topic queue
VTC.DZONE.JCG.Mary.Topic
which message selector set asaction='DZONE'
- line 51: Consume from topic
test.group1.topic
- line 65: Consume from any queue name matches the
*.group2.*"
- line 78: Set exclusive message consumer. It will fail over if one consumer is down then the other will be picked to continue
- line 91: Set
preFetch
size for the consumer
9.3 Execution Output
Now, started the ActiveMQMessageConsumerMainApp
. Here is the application output:
ActiveMQMessageConsumerMainApp Output
main: ActiveMQMessageConsumer Waiting for messages at Consumer.zheng.VirtualTopic.Customer.Topic main: ActiveMQMessageConsumer Waiting for messages at VTC.DZONE.JCG.Mary.Topic main: ActiveMQMessageConsumer Waiting for messages at test.group1.topic main: ActiveMQMessageConsumer Waiting for messages at *.group2.* [ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ *.group2.*] - Headers: [ action=JCG actionId=40 ] Message: [ {"type":"NEWCUSTOMER","customerId":79} ] main: ActiveMQMessageConsumer Waiting for messages at test.group1.queue2?consumer.prefetchSize=10 [ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ test.group1.queue2?consumer.prefetchSize=10] - Headers: [ action=DEMO actionId=84 ] Message: [ {"type":"NEWCUSTOMER","customerId":28} ]
Now execute OnBoardNewConsumerApp
a couple times. Here you see two lines printed out from the running consumer application console as the output below.
ActiveMQMessageConsumerMainApp Output Continue
[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=15 ] Message: [ {"type":"NEWCUSTOMER","customerId":51} ] [ActiveMQ Session Task-2]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=75 ] Message: [ {"type":"NEWCUSTOMER","customerId":73} ]
Always verify and confirm via the AMQ web console.
10. Integration with Spring JMS
Spring JMS provides a JMS integration framework that simplifies the use of the JMS API.
10.1 Add Spring JMS dependency
Add Spring JMS library to the project pom.xml.
pom.xml
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.5.RELEASE</version> </dependency>
10.2 Configure Spring Beans
Add Spring JMS Beans to the context.
JmsConfig
package jcg.demo.spring.jms.config; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.jms.support.destination.DynamicDestinationResolver; import jcg.demo.spring.jms.component.JmsExceptionListener; @Configuration @EnableJms @ComponentScan(basePackages = "jcg.demo.spring.jms.component, jcg.demo.spring.service") public class JmsConfig { private String concurrency = "1-10"; private String brokerURI = "tcp://localhost:61616"; @Autowired @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(JmsExceptionListener jmsExceptionListener) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(jmsConnectionFactory(jmsExceptionListener)); factory.setDestinationResolver(destinationResolver()); factory.setConcurrency(concurrency); factory.setPubSubDomain(false); return factory; } @Bean @Autowired public ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) { return createJmsConnectionFactory(brokerURI, jmsExceptionListener); } private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI); activeMQConnectionFactory.setExceptionListener(jmsExceptionListener); CachingConnectionFactory connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory); return connectionFactory; } @Bean(name = "jmsQueueTemplate") @Autowired public JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) { return new JmsTemplate(jmsConnectionFactory); } @Bean(name = "jmsTopicTemplate") @Autowired public JmsTemplate createJmsTopicTemplate(ConnectionFactory jmsConnectionFactory) { JmsTemplate template = new JmsTemplate(jmsConnectionFactory); template.setPubSubDomain(true); return template; } @Bean public DestinationResolver destinationResolver() { return new DynamicDestinationResolver(); } }
As you seen here, the order to create these Beans is managed by the Spring Dependency Injection.
10.3 MessageSender
A class to send messages based on Spring JMS framework.
MessageSender
package jcg.demo.spring.jms.component; import java.util.Map; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; @Component public class MessageSender { @Autowired private JmsTemplate jmsQueueTemplate; @Autowired private JmsTemplate jmsTopicTemplate; public void postToQueue(final String queueName, final String message) { MessageCreator messageCreator = new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }; jmsQueueTemplate.send(queueName, messageCreator); } public void postToQueue(final String queueName, Map headers, final String message) { jmsQueueTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); headers.forEach((k, v) -> { try { msg.setStringProperty(k, v); } catch (JMSException e) { System.out.println( String.format("JMS fails to set the Header value '%s' to property '%s'", v, k)); } }); return msg; } }); } public void postToTopic(final String topicName, Map headers, final String message) { jmsTopicTemplate.send(topicName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); headers.forEach((k, v) -> { try { msg.setStringProperty(k, v); } catch (JMSException e) { System.out.println( String.format("JMS fails to set the Header value '%s' to property '%s'", v, k)); } }); return msg; } }); } }
As you seen here, the MessageSender
is simpler than the ActiveMQMessageProducer
created at step 8.1.
10.4 BillingAppListener
A listener listens the new customer events and integrates with billing system.
BillingAppListener
package jcg.demo.spring.jms.component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import jcg.demo.jms.util.DataUtil; import jcg.demo.model.CustomerEvent; import jcg.demo.spring.service.BillingService; import jcg.demo.spring.service.MessageTransformer; @Component public class BillingAppListener { @Autowired private JmsTemplate jmsQueueTemplate; @Autowired private BillingService billingService; @Autowired private MessageTransformer msgTransformer; private String queueName = "Consumer.Billing." + DataUtil.CUSTOMER_VTC_TOPIC; public String receiveMessage() throws JMSException { System.out.println(Thread.currentThread().getName() + ": BillingAppListener receiveMessage."); Destination destination = new ActiveMQQueue(queueName); TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination); CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class); return billingService.handleNewCustomer(customerEvt); } }
As you seen here, this class is simpler than the ActiveMQMessageConsumer
created at step 9.1.
10.5 SupportAppListener
A listener listens the new customer events and integrates with the support system.
SupportAppListener
package jcg.demo.spring.jms.component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import jcg.demo.jms.util.DataUtil; import jcg.demo.model.CustomerEvent; import jcg.demo.spring.service.MessageTransformer; import jcg.demo.spring.service.SupportService; @Component public class SupportAppListener { @Autowired private JmsTemplate jmsQueueTemplate; @Autowired private SupportService supportService; @Autowired private MessageTransformer msgTransformer; private String queueName = "Consumer.Support." + DataUtil.CUSTOMER_VTC_TOPIC; public String receiveMessage() throws JMSException { System.out.println(Thread.currentThread().getName() + ": SupportAppListener receiveMessage." ); Destination destination = new ActiveMQQueue(queueName); TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination); CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class); return supportService.handleNewCustomer(customerEvt); } }
10.6 ConfigBillingforNewCustomerApp
Configure a Spring context to consume the new customer events to integrates with the billing system.
ConfigBillingforNewCustomerApp
package jcg.demo.spring.jms.app; import java.net.URISyntaxException; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Configuration; import com.google.gson.Gson; import jcg.demo.spring.jms.component.BillingAppListener; import jcg.demo.spring.jms.config.JmsConfig; @Configuration public class ConfigBillingForNewCustomerApp { public static void main(String[] args) throws URISyntaxException, Exception { Gson gson = new Gson(); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class); context.register(ConfigBillingForNewCustomerApp.class); try { BillingAppListener billingAppListener = (BillingAppListener) context.getBean("billingAppListener"); System.out.println("ConfigBillingForewCustomerApp receives " + billingAppListener.receiveMessage()); } finally { context.close(); } } }
10.7 ConfigSupportforNewCustomerApp
Configure a Spring context to consume the new customer events to integrates with the support system.
ConfigSupportforNewCustomerApp
package jcg.demo.spring.jms.app; import java.net.URISyntaxException; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Configuration; import com.google.gson.Gson; import jcg.demo.spring.jms.component.SupportAppListener; import jcg.demo.spring.jms.config.JmsConfig; @Configuration public class ConfigSupportForNewCustomerApp { public static void main(String[] args) throws URISyntaxException, Exception { Gson gson = new Gson(); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class); context.register(ConfigSupportForNewCustomerApp.class); try { SupportAppListener supportAppListener = (SupportAppListener) context.getBean("supportAppListener"); System.out.println("supportAppListener receives " + supportAppListener.receiveMessage()); } finally { context.close(); } } }
10.8 Run as Distributed Systems
By far, we built one Java JMS application – OnBoardNewCustomerApp
and two Spring JMS applications: ConfigBillingForNewCustomerApp
and ConfigSupportForNewCustomerApp
. Now it’s the time to run them together to enable the onborading customer process integrates with both billing and support system.
ConfigBillingForNewCustomerApp Output
main: ConfigBillingForNewCustomerApp receiveMessage.
ConfigSupportForNewCustomerApp Ourput
main: ConfigSupportForNewCustomerAppreceiveMessage.
Execute the OnBoardNewCustomerApp
. Here you will see both consumer received the customer message and processed them.
ConfigBillingForNewCustomerApp Output Continue
ConfigBillingForewCustomerApp receives BillingService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)
ConfigSupportForNewCustomerApp Output Continue
ConfigSupportForNewCustomerApp receives SupportService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)
You just witnessed a working distributed system.
11. Integrating with Tomcat
11.1 Configure Tomcat Resource
Configure Tomcat context.xml with AMQ resource as below.
context.xml
<Resource name="jms/ConnectionFactory" global="jms/ConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" useEmbeddedBroker="false"/>
11.2 Look up JNDI Resource
Use jndiContext.lookup
to look up the ActiveMQConnectionFactory
from the JNDI resource.
JmsConfig
private ConnectionFactory createJmsConnectionFactory(String jndiName, JMSExceptionListener exceptionListener) { CachingConnectionFactory connectionFactory = null; try { Context jndiContext = new InitialContext(); Context envContext = (Context) jndiContext.lookup("java:comp/env"); ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) envContext.lookup(jndiName); connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory); connectionFactory.setExceptionListener(exceptionListener); } catch (NamingException e) { String msg = String.format("Unable to get JMS container with name %s ", jndiName); throw new RuntimeException(msg, e); } return connectionFactory; }
12. Common Problems
There are three common problems when developing an ActiveMQ application.
12.1 Slow Consumer Application
When the AMQ console shows that there are growing numbers of pending messages. It indicates that the consumer’s application is slower than the producer publishes the messages. There are several ways to address this issue:
- The publishers publish the messages with a similar speed to the consumers consuming the messages
- The publishers publish the messages to different destinations to reduce the total messages consumers consume
- The consumers improve the speed it takes to process the message by separating any long processes from the main thread to an asynchronous thread
12.2 ActiveMQ Sends Unwanted Messages to Virtual Topic Queue
There a bug found in an AMQ broker which sends unwanted messages to the virtual queue when selector is defined. Our solution is let the applications handle the selector by setting the selectorAware
to false.
12.3 Exception Handler
Some applications redeliver the message back to destination when it encounters an exception. This may jam up the destination if it fails again. The better solution is to have separate exception handler to deal with any exceptions.
13. Summary
In this tutorial, we outlined the steps to install the configure the AMQ server and demonstrated:
- how to install and configure
- how to build AMQ applications via ActiveMQ library
- how to build AMQ applications with Spring JMS framework
- how to integrate with Tomcat web container
We also described three common problems when developing an AMQ application.
14. References
15. Download the Source Code
This example builds several java applications to send and receive messages via the AMQ broker.
You can download the full source code of this example here: Apache ActiveMQ Best Practices Tutorial