jms

Apache ActiveMQ Best Practices Tutorial

Apache ActiveMQ is an open source messaging server written in Java which implements JMS 1.1 specifications. In this tutorial, you will learn how to develop a few Java applications which integrate ActiveMQ to send and receive messages to and from destinations. If you already know how to install and configure ActiveMQ, you can skip the first four chapters.
 
 
 
 
 
 
 

1. Introduction

Apache ActiveMQ (AMQ) is JMS 1.1 implementation from the Apache Software Foundation.

AMQ is a message broker which translates the messages from the sender to the receiver. Message brokers are the building blocks of message-oriented middleware (MOM) architecture.

AMQ is one of the best open source messaging and Integration Patterns server. It provides a communication between applications, as well as fulfills both notification and inter-operation needs among the applications.

2. Install an Apache ActiveMQ Server

Most of business applications treat the AMQ as an infrastructure resource. We will install an AMQ server as a standalone server in this tutorial. Follow these instructions, we installed the AMQ 5.15.0.

3. Start the Apache ActiveMQ Server

Navigate to \apache-activemq-5.15.0\bin\win64 directory and click on the activemq.bat to start the server.

The output below demonstrates that the server started successfully.

server.log

1
2
jvm 1    |  INFO | Apache ActiveMQ 5.15.0 (localhost, ID:SL2LS431841-57319-1512184574307-0:1) started
jvm 1    |  INFO | For help or more information please see: http://activemq.apache.org

4. Monitor the Apache ActiveMQ Server

AMQ provides a web console application to monitor and administrate. After the AMQ server starts, follow the steps below to launch the web console.

  • Open a Browser: Chrome, IE, Firefox, etc
  • Enter the URL: localhost:8161/admin/index.php
  • Enter admin/admin as username/password

Here you should see the “Welcome” page. Users can send, read, and delete messages via the web console.

5. Business Use Cases

Company X provides services to customers. Each new customer will be set up at billing and support systems.

In this tutorial, we will demonstrate how to build customer on-boarding process, billing system, support application, and integrate them via AMQ:

  • OnBoardNewCustomerApp which sets up new customers and sends the new customer events to ActiveMQ customer topic
  • ConfigBillingForNewCustomerApp which listens to the new customer events from the virtual topic and configures it into the billing application
  • ConfigSupportForNewCustomerApp which listens to the new customer events from the virtual topic and configures it into the support application

6. Define JMS Message

6.1 Message Destination

For this business use case, both billing and support systems get notified when new customer joins. We choose the publish/subscribe message pattern to build the OnBoardNewCustomerApp which publishes the customer event to AMQ broker topic: VirtualTopic.Customer.Topic.
There are three special characters reserved by AMQ when naming the destination:

  • . is used to separate names in a path
  • * is used to match any name in a path
  • > is used to recursively match any destination starting from this name

6.2 Message Header

The message header provides meta data about the message used by both clients and the AMQ brokers. There are sets of pre-defined JMS message header. Giving two examples below:

  • JMSXGroupID: utilize this if you want some group of message to always go to same consumer
  • JMXCorrelationId: use this to link the message together

6.3 Message Body

The message body is the actual message that integrates the applications together. For this example, the message is Json format of the CustomerEvent.

CustomerEvent

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package jcg.demo.model;
 
public class CustomerEvent {
    private String type;
    private Integer customerId;
 
    public CustomerEvent(String type, Integer customerId) {
        this.type = type;
        this.customerId = customerId;
    }
 
    public String getType() {
        return type;
    }
 
    public Integer getCustomerId() {
        return customerId;
    }
 
    public String toString() {
        return "CustomerEvent: type(" + type + "), customerId(" + customerId + ")";
    }
 
    public String getCustomerDetailUri() {
        return "https://localhost:8080/support/customer/" + customerId;
    }
}

6.4 Configure Virtual Topic

AMQ server installation comes with a ready to use configuration file. Modify the activemq.xml to add below to allow AMQ Broker forwards the messages from any topic named as VirtualTopic.*.Topic to any virtutal topic destination with name starts as Consumer.*.

activemq.xml

1
2
3
4
5
6
7
8
<destinationInterceptors>
        <virtualDestinationInterceptor>
             <virtualDestinations>
                 <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
                 <virtualTopic name="JCG.>" prefix="VTC.*." selectorAware="true"/>
             </virtualDestinations>
       </virtualDestinationInterceptor>
</destinationInterceptors>
  • line 4: Configure Virtual Topic to disable selectorAware
  • line 4: Configure Virtual Topic to enable selectorAware

Restart the AMQ server after the configuration file updates.

7. Apache ActiveMQ Java Client Library

Add ActiveMQ Java library to the project pom.xml.

pom.xml

1
2
3
4
5
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.0</version>
</dependency>

8. Publish Message Application

In this example, you will see how to create ActiveMQMessgeProducer to send the messages.

8.1 ActiveMQMessgeProducer

A Java class wraps the ActiveMQ Java API to send the messages.

ActiveMQMessgeProducer

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package jcg.demo.activemq;
 
import java.util.Random;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
 
import com.google.gson.Gson;
 
import jcg.demo.jms.util.DataUtil;
 
/**
 * A simple message producer which sends the message to the ActiveMQ Broker.
 *
 * @author Mary.Zheng
 *
 */
public class ActiveMQMessageProducer {
 
    private static final String ACTION_ID_HEADER = "actionId";
    private static final String ACTION_HEADER = "action";
 
    private ConnectionFactory connFactory;
    private Connection connection;
    private Session session;
    private Destination destination;
    private MessageProducer msgProducer;
 
    private String activeMqBrokerUri;
    private String username;
    private String password;
 
    public ActiveMQMessageProducer(final String activeMqBrokerUri, final String username, final String password) {
        super();
        this.activeMqBrokerUri = activeMqBrokerUri;
        this.username = username;
        this.password = password;
    }
 
    public void setup(final boolean transacted, final boolean isDestinationTopic, final String destinationName)
            throws JMSException {
        setConnectionFactory(activeMqBrokerUri, username, password);
        setConnection();
        setSession(transacted);
        setDdestination(isDestinationTopic, destinationName);
        setMsgProducer();
    }
 
    public void close() throws JMSException {
        if (msgProducer != null) {
            msgProducer.close();
            msgProducer = null;
        }
 
        if (session != null) {
            session.close();
            session = null;
        }
        if (connection != null) {
            connection.close();
            connection = null;
        }
 
    }
 
    public void commit(final boolean transacted) throws JMSException {
        if (transacted) {
            session.commit();
        }
    }
 
    public void sendMessage(final String actionVal) throws JMSException {
        TextMessage textMessage = buildTextMessageWithProperty(actionVal);
        msgProducer.send(destination, textMessage);
        // msgProducer.send(textMessage, DeliveryMode.NON_PERSISTENT, 0, 0);
 
    }
 
    private TextMessage buildTextMessageWithProperty(final String action) throws JMSException {
        Gson gson = new Gson();
        String eventMsg = gson.toJson(DataUtil.buildDummyCustomerEvent());
        TextMessage textMessage = session.createTextMessage(eventMsg);
 
        Random rand = new Random();
        int value = rand.nextInt(100);
        textMessage.setStringProperty(ACTION_HEADER, action);
        textMessage.setStringProperty(ACTION_ID_HEADER, String.valueOf(value));
 
        return textMessage;
    }
 
    private void setDdestination(final boolean isDestinationTopic, final String destinationName) throws JMSException {
        if (isDestinationTopic) {
            destination = session.createTopic(destinationName);
        } else {
            destination = session.createQueue(destinationName);
        }
    }
 
    private void setMsgProducer() throws JMSException {
        msgProducer = session.createProducer(destination);
 
    }
 
    private void setSession(final boolean transacted) throws JMSException {
        // transacted=true for better performance to push message in batch mode
        session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
    }
 
    private void setConnection() throws JMSException {
        connection = connFactory.createConnection();
        connection.start();
    }
 
    private void setConnectionFactory(final String activeMqBrokerUri, final String username, final String password) {
        connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
 
        ((ActiveMQConnectionFactory) connFactory).setUseAsyncSend(true);
 
        RedeliveryPolicy policy = ((ActiveMQConnectionFactory) connFactory).getRedeliveryPolicy();
        policy.setInitialRedeliveryDelay(500);
        policy.setBackOffMultiplier(2);
        policy.setUseExponentialBackOff(true);
        policy.setMaximumRedeliveries(2);
    }
 
}
  • line 51-55: Wire connection, session with correct order. Spring JMS Dependency Injection takes care of it for you.
  • line 58-73: Close connection. Spring JMS takes care of it for you
  • line 84: Define the durability of message. All message are durable by default. We can turn off to get better performance

8.2 ActiveMQMessgeProducerTest

This Junit test sends the messages to various destinations. This is my convenient way to send the message to the destination.

ActiveMQMessgeProducerTest

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package jcg.demo.activemq;
 
import javax.jms.JMSException;
 
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
 
import jcg.demo.jms.util.DataUtil;
 
public class ActiveMQMessageProducerTest {
 
    private ActiveMQMessageProducer msgQueueSender;
 
    @Before
    public void setup() {
        msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin");
    }
 
    @After
    public void cleanup() throws JMSException {
        msgQueueSender.close();
    }
 
    @Test
    public void send_msg_to_no_transaction_Queue() throws JMSException {
        msgQueueSender.setup(false, false, DataUtil.TEST_GROUP1_QUEUE_1);
        msgQueueSender.sendMessage("JCG");
    }
 
    @Test
    public void send_msg_to_Group2_Queue1() throws JMSException {
        msgQueueSender.setup(false, false, DataUtil.TEST_GROUP2_QUEUE_1);
        msgQueueSender.sendMessage("JCG");
    }
 
    @Test
    public void send_msg_to_transaction_Group1_Queue2() throws JMSException {
        msgQueueSender.setup(true, false, DataUtil.TEST_GROUP1_QUEUE_2);
        msgQueueSender.sendMessage("DEMO");
        msgQueueSender.commit(true);
    }
 
    @Test
    public void send_msg_to_no_transaction_Group1_Topic() throws JMSException {
        msgQueueSender.setup(false, true, DataUtil.TEST_GROUP1_TOPIC);
        msgQueueSender.sendMessage("MZHENG");
    }
 
    @Test
    public void send_msg_to_Virtual_Topic() throws JMSException {
        msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC);
        msgQueueSender.sendMessage("MZHENG");
    }
 
    @Test
    public void send_msg_to_Virtual_Topic_WithSelector() throws JMSException {
        msgQueueSender.setup(false, true, DataUtil.TEST_VTC_TOPIC_SELECTOR);
        msgQueueSender.sendMessage("DZONE");
    }
 
}
  • line 27-28: Send to queue test.group1.queue1
  • line 33-34: Send to queue test.group2.queue1
  • line 39-41: Send to queue test.group1.queue2
  • line 46-47: Send to normal topic test.group1.topic
  • line 52-53: Send to selector unaware topic VirtualTopic.Customer.Topic
  • line 58-59: Send to selector aware topic JCG.Mary.Topic

8.3 Execution Output

We ran the ActiveMQMessgeProducerTest to send message to three queues and three topics. You can verified by viewing the AMQ web console. There are one pending messages in each of three queues:test.group1.queue1, test.group1.queue2, and test.group2.queue1.

There is one messages in each of three topics: JCG.Mary.Topic, test.group1.topic and VirtualTopic.Customer.Topic.

8.4 OnBoardNewCustomerApp

OnBoardNewCustomerApp sends the new customer message to the VirtualTopic.Customer.Topic.

OnBoardNewCustomerApp

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
package jcg.demo.activemq.app;
 
import jcg.demo.activemq.ActiveMQMessageProducer;
import jcg.demo.jms.util.DataUtil;
 
public class OnBoardNewCustomerApp {
    public static void main(String[] args) {
        ActiveMQMessageProducer msgQueueSender = new ActiveMQMessageProducer("tcp://localhost:61616", "admin", "admin");
        try {
            msgQueueSender.setup(false, true, DataUtil.CUSTOMER_VTC_TOPIC);
            msgQueueSender.sendMessage("CUSTOMER");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
}

Execute OnBoardNewCustomerApp sends a customer message to the VirtualTopic.Customer.Topic. However, since there is no consumer yet, so AMQ Broker will not send any message to the virtual topic queue yet.

9. Consume Message Application

9.1 ActiveMQMessageConsumer

A message consumer utilitizes AMQ java API.

ActiveMQMessgeConsumer

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package jcg.demo.activemq;
 
import java.util.Enumeration;
 
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
/**
 * A simple message consumer which consumes the message from ActiveMQ Broker.
 *
 * @author Mary.Zheng
 *
 */
public class ActiveMQMessageConsumer implements MessageListener {
 
    private String activeMqBrokerUri;
    private String username;
    private String password;
 
    private boolean isDestinationTopic;
    private String destinationName;
    private String selector;
    private String clientId;
 
    public ActiveMQMessageConsumer(String activeMqBrokerUri, String username, String password) {
        super();
        this.activeMqBrokerUri = activeMqBrokerUri;
        this.username = username;
        this.password = password;
    }
 
    public void run() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
        if (clientId != null) {
            factory.setClientID(clientId);
        }
        Connection connection = factory.createConnection();
        if (clientId != null) {
            connection.setClientID(clientId);
        }
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
        setComsumer(session);
 
        connection.start();
        System.out.println(Thread.currentThread().getName() + ": ActiveMQMessageConsumer Waiting for messages at "
                + destinationName);
    }
 
    private void setComsumer(Session session) throws JMSException {
        MessageConsumer consumer = null;
        if (isDestinationTopic) {
            Topic topic = session.createTopic(destinationName);
 
            if (selector == null) {
                consumer = session.createConsumer(topic);
            } else {
                consumer = session.createConsumer(topic, selector);
            }
        } else {
            Destination destination = session.createQueue(destinationName);
 
            if (selector == null) {
                consumer = session.createConsumer(destination);
            } else {
                consumer = session.createConsumer(destination, selector);
            }
        }
 
        consumer.setMessageListener(this);
    }
 
    @Override
    public void onMessage(Message message) {
 
        String msg;
        try {
            msg = String.format(
                    "[%s]: ActiveMQMessageConsumer Received message from [ %s] - Headers: [ %s] Message: [ %s ]",
                    Thread.currentThread().getName(), destinationName, getPropertyNames(message),
                    ((TextMessage) message).getText());
            System.out.println(msg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }
 
    private String getPropertyNames(Message message) throws JMSException {
        String props = "";
        @SuppressWarnings("unchecked")
        Enumeration properties = message.getPropertyNames();
        while (properties.hasMoreElements()) {
            String propKey = properties.nextElement();
            props += propKey + "=" + message.getStringProperty(propKey) + " ";
        }
        return props;
    }
 
    public void setSelector(String selector) {
        this.selector = selector;
    }
 
    public boolean isDestinationTopic() {
        return isDestinationTopic;
    }
 
    public String getDestinationName() {
        return destinationName;
    }
 
    public String getSelector() {
        return selector;
    }
 
    public String getClientId() {
        return clientId;
    }
 
    public void setDestinationTopic(boolean isDestinationTopic) {
        this.isDestinationTopic = isDestinationTopic;
    }
 
    public void setDestinationName(String destinationName) {
        this.destinationName = destinationName;
    }
 
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}
  • line 23: Create ActiveMQMessageConsumer by implementing javax.jms.MessageListener
  • line 44: Set connection clientID
  • line 62: Create a topic
  • line 65: Create message consumer from a topic without selector
  • line 67: Create message consumer from a topic with selector
  • line 70: Create a queue
  • line 73: Create message consumer from a queue without selector
  • line 75: Create message consumer from a queue with selector
  • line 79: Register message listener
  • line 83: Override the onMessage

9.2 ActiveMQMessageConsumerMainApp

Create ActiveMQMessageConsumerMainApp to consume from various destinations.

ActiveMQMessageConsumerMainApp

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
package jcg.demo.activemq.app;
 
import javax.jms.JMSException;
 
import jcg.demo.activemq.ActiveMQMessageConsumer;
import jcg.demo.jms.util.DataUtil;
 
public class ActiveMQMessageConsumerMainApp {
 
    public static void main(String[] args) {
 
        consumeCustomerVTCQueue();
        consumerVTCQueueWithSelector();
        consumeGroup1Topic();
        consumeAllGroup2();
        consume_queue_with_prefetchsize();
 
    }
 
    private static void consumeCustomerVTCQueue() {
        // the message in the topic before this subscriber starts will not be
        // picked up.
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName("Consumer.zheng." + DataUtil.CUSTOMER_VTC_TOPIC);
 
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
    private static void consumerVTCQueueWithSelector() {
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName("VTC.DZONE." + DataUtil.TEST_VTC_TOPIC_SELECTOR);
        queueMsgListener.setSelector("action='DZONE'");
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
    private static void consumeGroup1Topic() {
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_TOPIC);
        queueMsgListener.setDestinationTopic(true);
 
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
    private static void consumeAllGroup2() {
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName("*.group2.*");
 
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
    private static void exclusive_queue_Consumer() {
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName(DataUtil.TEST_GROUP2_QUEUE_2 + "?consumer.exclusive=true");
 
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
    private static void consume_queue_with_prefetchsize() {
        ActiveMQMessageConsumer queueMsgListener = new ActiveMQMessageConsumer("tcp://localhost:61616", "admin",
                "admin");
        queueMsgListener.setDestinationName(DataUtil.TEST_GROUP1_QUEUE_2 + "?consumer.prefetchSize=10");
 
        try {
            queueMsgListener.run();
        } catch (JMSException e) {
 
            e.printStackTrace();
        }
    }
 
}
  • line 25: Consume from virtual topic queue Consumer.zheng.VirtualTopic.Customer.Topic
  • line 38-39: Consume from virtual topic queue VTC.DZONE.JCG.Mary.Topic which message selector set as action='DZONE'
  • line 51: Consume from topic test.group1.topic
  • line 65: Consume from any queue name matches the *.group2.*"
  • line 78: Set exclusive message consumer. It will fail over if one consumer is down then the other will be picked to continue
  • line 91: Set preFetch size for the consumer

9.3 Execution Output

Now, started the ActiveMQMessageConsumerMainApp. Here is the application output:

ActiveMQMessageConsumerMainApp Output

1
2
3
4
5
6
7
main: ActiveMQMessageConsumer Waiting for messages at Consumer.zheng.VirtualTopic.Customer.Topic
main: ActiveMQMessageConsumer Waiting for messages at VTC.DZONE.JCG.Mary.Topic
main: ActiveMQMessageConsumer Waiting for messages at test.group1.topic
main: ActiveMQMessageConsumer Waiting for messages at *.group2.*
[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ *.group2.*] - Headers: [ action=JCG actionId=40 ] Message: [ {"type":"NEWCUSTOMER","customerId":79} ]
main: ActiveMQMessageConsumer Waiting for messages at test.group1.queue2?consumer.prefetchSize=10
[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ test.group1.queue2?consumer.prefetchSize=10] - Headers: [ action=DEMO actionId=84 ] Message: [ {"type":"NEWCUSTOMER","customerId":28} ]

Now execute OnBoardNewConsumerApp a couple times. Here you see two lines printed out from the running consumer application console as the output below.

ActiveMQMessageConsumerMainApp Output Continue

1
2
[ActiveMQ Session Task-1]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=15 ] Message: [ {"type":"NEWCUSTOMER","customerId":51} ]
[ActiveMQ Session Task-2]: ActiveMQMessageConsumer Received message from [ Consumer.zheng.VirtualTopic.Customer.Topic] - Headers: [ action=CUSTOMER actionId=75 ] Message: [ {"type":"NEWCUSTOMER","customerId":73} ]

Always verify and confirm via the AMQ web console.

10. Integration with Spring JMS

Spring JMS provides a JMS integration framework that simplifies the use of the JMS API.

10.1 Add Spring JMS dependency

Add Spring JMS library to the project pom.xml.

pom.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.1.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.1.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.1.5.RELEASE</version>
</dependency>

10.2 Configure Spring Beans

Add Spring JMS Beans to the context.

JmsConfig

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package jcg.demo.spring.jms.config;
 
import javax.jms.ConnectionFactory;
 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
 
import jcg.demo.spring.jms.component.JmsExceptionListener;
 
@Configuration
@EnableJms
@ComponentScan(basePackages = "jcg.demo.spring.jms.component, jcg.demo.spring.service")
public class JmsConfig {
 
    private String concurrency = "1-10";
    private String brokerURI = "tcp://localhost:61616";
 
    @Autowired
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(JmsExceptionListener jmsExceptionListener) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(jmsConnectionFactory(jmsExceptionListener));
        factory.setDestinationResolver(destinationResolver());
        factory.setConcurrency(concurrency);
        factory.setPubSubDomain(false);
        return factory;
    }
 
    @Bean
    @Autowired
    public ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) {
        return createJmsConnectionFactory(brokerURI, jmsExceptionListener);
    }
 
    private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI);
        activeMQConnectionFactory.setExceptionListener(jmsExceptionListener);
 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory);
        return connectionFactory;
    }
 
    @Bean(name = "jmsQueueTemplate")
    @Autowired
    public JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) {
        return new JmsTemplate(jmsConnectionFactory);
    }
 
    @Bean(name = "jmsTopicTemplate")
    @Autowired
    public JmsTemplate createJmsTopicTemplate(ConnectionFactory jmsConnectionFactory) {
        JmsTemplate template = new JmsTemplate(jmsConnectionFactory);
        template.setPubSubDomain(true);
        return template;
    }
 
    @Bean
    public DestinationResolver destinationResolver() {
        return new DynamicDestinationResolver();
    }
 
}

As you seen here, the order to create these Beans is managed by the Spring Dependency Injection.

10.3 MessageSender

A class to send messages based on Spring JMS framework.

MessageSender

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package jcg.demo.spring.jms.component;
 
import java.util.Map;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
@Component
public class MessageSender {
 
    @Autowired
    private JmsTemplate jmsQueueTemplate;
 
    @Autowired
    private JmsTemplate jmsTopicTemplate;
 
    public void postToQueue(final String queueName, final String message) {
 
        MessageCreator messageCreator = new MessageCreator() {
 
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        };
 
        jmsQueueTemplate.send(queueName, messageCreator);
     
    }
 
    public void postToQueue(final String queueName, Map headers, final String message) {
 
        jmsQueueTemplate.send(queueName, new MessageCreator() {
 
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                headers.forEach((k, v) -> {
                    try {
                        msg.setStringProperty(k, v);
                    } catch (JMSException e) {
                        System.out.println(
                                String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));
                    }
                });
                return msg;
            }
        });
    }
 
    public void postToTopic(final String topicName, Map headers, final String message) {
 
        jmsTopicTemplate.send(topicName, new MessageCreator() {
 
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                headers.forEach((k, v) -> {
                    try {
                        msg.setStringProperty(k, v);
                    } catch (JMSException e) {
                        System.out.println(
                                String.format("JMS fails to set the Header value '%s' to property '%s'", v, k));
                    }
                });
                return msg;
            }
        });
    }
 
}

As you seen here, the MessageSender is simpler than the ActiveMQMessageProducer created at step 8.1.

10.4 BillingAppListener

A listener listens the new customer events and integrates with billing system.

BillingAppListener

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package jcg.demo.spring.jms.component;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
 
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
 
import jcg.demo.jms.util.DataUtil;
import jcg.demo.model.CustomerEvent;
import jcg.demo.spring.service.BillingService;
import jcg.demo.spring.service.MessageTransformer;
 
@Component
public class BillingAppListener {
 
    @Autowired
    private JmsTemplate jmsQueueTemplate;
 
    @Autowired
    private BillingService billingService;
 
    @Autowired
    private MessageTransformer msgTransformer;
 
    private String queueName = "Consumer.Billing." + DataUtil.CUSTOMER_VTC_TOPIC;
 
    public String receiveMessage() throws JMSException {
        System.out.println(Thread.currentThread().getName() + ": BillingAppListener receiveMessage.");
 
        Destination destination = new ActiveMQQueue(queueName);
        TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);
 
        CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);
        return billingService.handleNewCustomer(customerEvt);
    }
}

As you seen here, this class is simpler than the ActiveMQMessageConsumer created at step 9.1.

10.5 SupportAppListener

A listener listens the new customer events and integrates with the support system.

SupportAppListener

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package jcg.demo.spring.jms.component;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
 
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
 
import jcg.demo.jms.util.DataUtil;
import jcg.demo.model.CustomerEvent;
import jcg.demo.spring.service.MessageTransformer;
import jcg.demo.spring.service.SupportService;
 
@Component
public class SupportAppListener {
 
    @Autowired
    private JmsTemplate jmsQueueTemplate;
     
    @Autowired
    private SupportService supportService;
     
    @Autowired
    private MessageTransformer msgTransformer;
     
    private String queueName = "Consumer.Support." + DataUtil.CUSTOMER_VTC_TOPIC;
 
    public String receiveMessage() throws JMSException {
        System.out.println(Thread.currentThread().getName() + ": SupportAppListener receiveMessage." );
 
        Destination destination = new ActiveMQQueue(queueName);
        TextMessage textMessage = (TextMessage) jmsQueueTemplate.receive(destination);
         
        CustomerEvent customerEvt = msgTransformer.fromJson(textMessage.getText(), CustomerEvent.class);
        return supportService.handleNewCustomer(customerEvt);
    }
}

10.6 ConfigBillingforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the billing system.

ConfigBillingforNewCustomerApp

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package jcg.demo.spring.jms.app;
 
import java.net.URISyntaxException;
 
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
 
import com.google.gson.Gson;
 
import jcg.demo.spring.jms.component.BillingAppListener;
import jcg.demo.spring.jms.config.JmsConfig;
 
@Configuration
public class ConfigBillingForNewCustomerApp {
    public static void main(String[] args) throws URISyntaxException, Exception {
        Gson gson = new Gson();
 
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);
        context.register(ConfigBillingForNewCustomerApp.class);
 
        try {
 
            BillingAppListener billingAppListener = (BillingAppListener) context.getBean("billingAppListener");
 
            System.out.println("ConfigBillingForewCustomerApp receives " + billingAppListener.receiveMessage());
 
        } finally {
            context.close();
        }
    }
 
}

10.7 ConfigSupportforNewCustomerApp

Configure a Spring context to consume the new customer events to integrates with the support system.

ConfigSupportforNewCustomerApp

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package jcg.demo.spring.jms.app;
 
import java.net.URISyntaxException;
 
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
 
import com.google.gson.Gson;
 
import jcg.demo.spring.jms.component.SupportAppListener;
import jcg.demo.spring.jms.config.JmsConfig;
 
@Configuration
public class ConfigSupportForNewCustomerApp {
    public static void main(String[] args) throws URISyntaxException, Exception {
        Gson gson = new Gson();
 
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class);
        context.register(ConfigSupportForNewCustomerApp.class);
 
        try {
            SupportAppListener supportAppListener = (SupportAppListener) context.getBean("supportAppListener");
            System.out.println("supportAppListener receives " + supportAppListener.receiveMessage());
 
        } finally {
            context.close();
        }
    }
 
}

10.8 Run as Distributed Systems

By far, we built one Java JMS application – OnBoardNewCustomerApp and two Spring JMS applications: ConfigBillingForNewCustomerApp and ConfigSupportForNewCustomerApp. Now it’s the time to run them together to enable the onborading customer process integrates with both billing and support system.

ConfigBillingForNewCustomerApp Output

1
main: ConfigBillingForNewCustomerApp receiveMessage.

ConfigSupportForNewCustomerApp Ourput

1
main: ConfigSupportForNewCustomerAppreceiveMessage.

Execute the OnBoardNewCustomerApp. Here you will see both consumer received the customer message and processed them.

ConfigBillingForNewCustomerApp Output Continue

1
ConfigBillingForewCustomerApp receives BillingService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

ConfigSupportForNewCustomerApp Output Continue

1
ConfigSupportForNewCustomerApp receives SupportService handleNewCustomer CustomerEvent: type(NEWCUSTOMER), customerId(41)

You just witnessed a working distributed system.

11. Integrating with Tomcat

11.1 Configure Tomcat Resource

Configure Tomcat context.xml with AMQ resource as below.

context.xml

1
2
3
4
5
6
7
<Resource name="jms/ConnectionFactory" global="jms/ConnectionFactory" auth="Container"
       type="org.apache.activemq.ActiveMQConnectionFactory"
       factory="org.apache.activemq.jndi.JNDIReferenceFactory"
       brokerURL="tcp://localhost:61616"
       userName="admin"
       password="admin"
       useEmbeddedBroker="false"/>

11.2 Look up JNDI Resource

Use jndiContext.lookup to look up the ActiveMQConnectionFactory from the JNDI resource.

JmsConfig

01
02
03
04
05
06
07
08
09
10
11
12
13
14
private ConnectionFactory createJmsConnectionFactory(String jndiName, JMSExceptionListener exceptionListener) {
        CachingConnectionFactory connectionFactory = null;
        try {
            Context jndiContext = new InitialContext();
            Context envContext = (Context) jndiContext.lookup("java:comp/env");
            ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) envContext.lookup(jndiName);
            connectionFactory = new CachingConnectionFactory(activeMQConnectionFactory);
            connectionFactory.setExceptionListener(exceptionListener);
        } catch (NamingException e) {
            String msg = String.format("Unable to get JMS container with name %s ", jndiName);     
            throw new RuntimeException(msg, e);
        }
        return connectionFactory;
    }

12. Common Problems

There are three common problems when developing an ActiveMQ application.

12.1 Slow Consumer Application

When the AMQ console shows that there are growing numbers of pending messages. It indicates that the consumer’s application is slower than the producer publishes the messages. There are several ways to address this issue:

  • The publishers publish the messages with a similar speed to the consumers consuming the messages
  • The publishers publish the messages to different destinations to reduce the total messages consumers consume
  • The consumers improve the speed it takes to process the message by separating any long processes from the main thread to an asynchronous thread

12.2 ActiveMQ Sends Unwanted Messages to Virtual Topic Queue

There a bug found in an AMQ broker which sends unwanted messages to the virtual queue when selector is defined. Our solution is let the applications handle the selector by setting the selectorAware to false.

12.3 Exception Handler

Some applications redeliver the message back to destination when it encounters an exception. This may jam up the destination if it fails again. The better solution is to have separate exception handler to deal with any exceptions.

13. Summary

In this tutorial, we outlined the steps to install the configure the AMQ server and demonstrated:

  • how to install and configure
  • how to build AMQ applications via ActiveMQ library
  • how to build AMQ applications with Spring JMS framework
  • how to integrate with Tomcat web container

We also described three common problems when developing an AMQ application.

14. References

    1. ActionMQ in Action
    2. Apache ActiveMQ

15. Download the Source Code

This example builds several java applications to send and receive messages via the AMQ broker.

Download
You can download the full source code of this example here: Apache ActiveMQ Best Practices Tutorial

Mary Zheng

Mary has graduated from Mechanical Engineering department at ShangHai JiaoTong University. She also holds a Master degree in Computer Science from Webster University. During her studies she has been involved with a large number of projects ranging from programming and software engineering. She works as a senior Software Engineer in the telecommunications sector where she acts as a leader and works with others to design, implement, and monitor the software solution.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button