Apache ActiveMQ Distributed Queue Tutorial
Apache ActiveMQ (AMQ) is an open source messaging server written in Java, which implements JMS 1.1 specifications. In this article, I will demonstrate how to utilize a distributed queue within a group of AMQ brokers.
Table Of Contents
1. Introduction
Apache ActiveMQ (AMQ) is a message broker which transfers the messages from the sender to the receiver.
A distributed queue is a single unit of Java Message Service (JMS) queues that are accessible as a single, logical queue to a client. The members of the unit are usually distributed across multiple servers within a cluster, with each queue member belonging to a separate JMS server.
AMQ provides network connectors to connect AMQ servers as a cluster. In a network of AMQ servers, the messages in a queue at Broker A can be consumed by a client from a different broker.
In this example, I will demonstrate how a distributed queue works in AMQ brokers.
2. Apache ActiveMQ Server Installation
Follow these instructions to install an AMQ server. Then use the AMQ admin command: activemq-admin create ${brokerName}
to create a server instance.
Click here for details.
3. Producer Java Application
3.1 MessageProducerApp
Create MessageProducerApp.
MessageProducerApp.java
package jcg.demo.activemq; import jcg.demo.util.DataUtils; import jcg.demo.util.InputData; public class MessageProducerApp { public static void main(String[] args) { InputData brokerTestData = DataUtils.readTestData(); if (brokerTestData == null) { System.out.println("Wrong input"); } else { QueueMessageProducer queProducer = new QueueMessageProducer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN, DataUtils.ADMIN); queProducer.sendDummyMessages(brokerTestData.getQueueName()); } } }
3.2 QueueMessageProducer
Create QueueMessageProducer.
QueueMessageProducer.java
package jcg.demo.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import jcg.demo.util.DataUtils; /** * A simple message producer which sends the message to ActiveMQ Broker * * @author Mary.Zheng * */ public class QueueMessageProducer { private String activeMqBrokerUri; private String username; private String password; public QueueMessageProducer(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } public void sendDummyMessages(String queueName) { System.out.println("QueueMessageProducer started " + this.activeMqBrokerUri); ConnectionFactory connFactory = null; Connection connection = null; Session session = null; MessageProducer msgProducer = null; try { connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); connection = connFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); msgProducer = session.createProducer(session.createQueue(queueName)); for (int i = 0; i < DataUtils.MESSAGE_SIZE; i++) { TextMessage textMessage = session.createTextMessage(DataUtils.buildDummyMessage(i)); msgProducer.send(textMessage); Thread.sleep(30000); } System.out.println("QueueMessageProducer completed"); } catch (JMSException | InterruptedException e) { System.out.println("Caught exception: " + e.getMessage()); } try { if (msgProducer != null) { msgProducer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Throwable ignore) { } } }
- Line 49: Sleep 30 seconds after sending a message for demonstrating a slow producer
3.3 Export MessageProducerApp as a Jar
Export MessageProducerApp as activemq-msgproducerApp.jar
4. Consumer Java Application
4.1 MessageConsumerApp
Create MessageConsumerApp.
MessageConsumerApp.java
package jcg.demo.activemq; import javax.jms.JMSException; import jcg.demo.util.DataUtils; import jcg.demo.util.InputData; public class MessageConsumerApp { public static void main(String[] args) { InputData brokerTestData = DataUtils.readTestData(); if (brokerTestData == null) { System.out.println("Wrong input"); } else { QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN, DataUtils.ADMIN); queueMsgListener.setDestinationName(brokerTestData.getQueueName()); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } } }
- Line 16: Test AMQ broker URL
- Line 18: Test queue name
4.2 QueueMessageConsumer
Create QueueMessageConsumer.
QueueMessageConsumer.java
package jcg.demo.activemq; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage; /** * A simple message consumer which consumes the message from ActiveMQ Broker * * @author Mary.Zheng * */ public class QueueMessageConsumer implements MessageListener { private String activeMqBrokerUri; private String username; private String password; private String destinationName; public QueueMessageConsumer(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); Connection connection = factory.createConnection(); connection.setClientID(getClientId()); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); System.out.println(String.format("QueueMessageConsumer Waiting for messages at queue='%s' broker='%s'", destinationName, this.activeMqBrokerUri)); } @Override public void onMessage(Message message) { if (message instanceof ActiveMQTextMessage) { ActiveMQTextMessage amqMessage = (ActiveMQTextMessage) message; try { String msg = String.format("QueueMessageConsumer Received message [ %s ]", amqMessage.getText()); System.out.println(msg); } catch (JMSException e) { e.printStackTrace(); } } else { System.out.println("QueueMessageConsumer Received non-text message"); } } public String getDestinationName() { return destinationName; } public void setDestinationName(String destinationName) { this.destinationName = destinationName; } private String getClientId() { return "MzhengClient_" + destinationName + "_" + activeMqBrokerUri.replace("tcp://localhost:", ""); } }
- Line 37: Set connection
clientID
- Line 74: Set client Id from the queue name and the broker port.
4.3 Common Utils
Create DataUtils
.
DataUtils.java
package jcg.demo.util; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Scanner; import org.springframework.util.StringUtils; /** * The data utility used in this Demo * * @author Mary.Zheng * */ public final class DataUtils { private static final String INPUT_PROMPT_1 = "Enter Broker URL(tcp://$host:$port): "; private static final String INPUT_PROMPT_2 = "Enter Queue Name: "; public static final int MESSAGE_SIZE = 10; public static final String ADMIN = "admin"; public static String buildDummyMessage(int value) { DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); LocalDateTime now = LocalDateTime.now(); return "dummy message [" + value + "], created at " + dtf.format(now); } public static InputData readTestData() { InputData testData = null; try (Scanner scanIn = new Scanner(System.in)) { System.out.println(INPUT_PROMPT_1); String brokerUrl = scanIn.nextLine(); System.out.println(INPUT_PROMPT_2); String queueName = scanIn.nextLine(); if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(brokerUrl)) { return testData; } testData = new InputData( brokerUrl, queueName); } return testData; } }
- Line 26: Include the message born time in the message body for demonstrating purpose
Create InputData
to hold the testing data.
InputData.java
package jcg.demo.util; /** * The input data for this demo. * * @author Mary.Zheng * */ public class InputData { private String brokerUrl; private String queueName; public InputData(String brokerUrl, String queueName) { super(); this.brokerUrl = brokerUrl.trim(); this.queueName = queueName.trim(); } public String getBrokerUrl() { return brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } }
4.4 Export MessageConsumerApp as a Jar
Export MessageConsumerApp as activemq-msgConsumerApp.jar
5. Distributed Queue in a Static Network of Brokers
In this example, Producer-1
sends messages to Queue.1
at Broker-1.
Consumer-1
receives the messages from Queue.1
at Broker-3
. Queue.1
is the distributed queue. It’s useful when the producer and consumer applications cannot be in the same AMQ server.
Image below shows a distribute queue (queue.1
) in Brokers-1
and Broker-3
.
5.1 Configure a Static Network of Brokers
Configure a network of Broker-1
and Broker-3
:
Broker name | Home Path | Openwire Port | Web Port | Data Path |
---|---|---|---|---|
broker-1 | ..\cluster\broker-1 | 61816 | 8861 | ..\data |
broker-3 | ..\cluster\broker-3 | 61516 | 5161 | \broker-3\data |
Click here for the configuration details.
5.2 Verify the AMQ Brokers – Part I
Start both Broker-1
and Broker-3
.
Navigate to the AMQ web console to view the connections details.
Image 5.2.1 Broker-1(8861) Connections
Note: Broker-1 connection client name is defined at step 4.2.
Image 5.2.2 Broker-3 (5161) Connections
Note: Broker-3 has a network connector to Broker-1.
5.3 Execute the Consumer Application
Enter java -jar activemq-msgConsumerApp.jar
to start MessageConsumerApp
.
MessageConsumerApp Output
C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61816 Enter Queue Name: queue.1 QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61816'
- Line 3: Enter Broker-1 URL
- Line 5: Enter queue name
queue.1
5.4 Execute the Publisher Application
While MessageConsumerApp
is running, enter java -jar activemq-msgproducerApp
to start MessageProducerApp
.
MessageProducerApp output
C:\Users\shu.shan\Desktop>java -jar activemq-msgproducerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61516 Enter Queue Name: queue.1 QueueMessageProducer started tcp://localhost:61516
- Line 3: Enter Broker-3 URL
- Line 5: Enter queue name
queue.1
Image below shows both applications are running.
5.5 Verify the AMQ Brokers – Part II
Navigate to Broker-1 web console, click queues to see queue.1
, lastly, click on its active consumers link.
Image Below shows queue.1
‘s active consumer – Mzhengclient-queue.1_61816
at broker-1
.
Image Below shows queue.1
‘s active consumer – nc:61516-61816_broker-1_inbound_broker-3
at broker-3
.
Note: queue.1
is the distributed queue via the broker’s connect connector.
6. Distributed Queue in a Dynamic Network of Brokers
In this example, Producer-1
sends messages to queue.1
at Dynamic-Broker1
, Producer-2
also sends messages to queue.1
at Dynamic-Broker2
, Consumer-1
receives the messages from Queue.1
at Dynamic-Broker3
. Queue.1
is the distributed queue. It’s useful to share the load among multiple producers and support in-order delivery when processing the messages.
Diagram below shows a distributed queue(Queue.1
) among three brokers.
6.1 Configure a Dynamic Network of Brokers
Configure a dynamic network of brokers with three brokers:
Broker name | Home Path | Openwire Port | Web Port | Data Path |
---|---|---|---|---|
dynamic-broker1 | ..\cluster\dynamic-broker1 | 61626 | 8166 | ..\dynamic-broker1\data |
dynamic-broker2 | ..\cluster\dynamic-broker2 | 61636 | 8164 | ..\dynamic-broker2\data |
dynamic-broker3 | ..\cluster\dynamic-broker3 | 61646 | 8165 | ..\dynamic-broker3\data |
Click here for the configuration details.
6.2 Verify the AMQ Brokers – Part I
Start all three dynamic-brokers. Navigate to the AMQ web console to view the connections details.
Image below shows Dynamic-broker1
(8166) connections.
6.3 Execute the Consumer Application
Enter java -jar activemq-msgConsumerApp.jar
to start MessageConsumerApp
at Dynnamic-broker2
.
MessageConsumerApp Output
C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar Enter Broker URL(tcp://$host:$port): tcp://localhost:61636 Enter Queue Name: queue.1 QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61636'
6.4 Execute the Publisher Application
While MessageConsumerApp
is running, enter java -jar activemq-msgproducerApp
to start MessageProducerApp
twice, one for Dynamic-broker1
, the other for Dynamic-broker3
.
Note: The consumer listens to queue.1
atDynamic-Broker2
while two publishers publish the messages to queue.1
at Dynamic-Broker1
and Dynamic-Broker3
. The consumer processed the messages based on the message’s born time.
6.5 Verify the AMQ Brokers – Part II
Navigate to Dynamic-Broker2
web console, click queues to see queue.1
, lastly, click on its active consumers link.
Image below shows queue.1
‘s active consumer – Mzhengclient-queue.1_61636
at broker-3
.
Image below shows queue.1
at Dynamic-broker3
has two active consumers via the network of brokers.
Note: queue.1
is the distributed queue via the broker’s connect connector.
7. Summary
In this article, I demonstrated two cases of a distributed queue by utilizing AMQ with a network of brokers. AMQ network of brokers also provides high availability to the client. Click here for more details on high availability.
Distributed Queue provides support for deliveries where subscribers receives messages in the same order they have been published. Besides Apache ActiveMQ, IBM MQ, RabbitMQ, HornetQ, and Apache Kafka also support Distributed Queues.
8. References
- https://examples.javacodegeeks.com/enterprise-java/jms/apache-activemq-best-practices-tutorial/
- https://examples.javacodegeeks.com/enterprise-java/jms/apache-activemq-failover-example/
- http://srinathsview.blogspot.com/2012/05/scaling-distributed-queues-short-survay.html
- https://docs.oracle.com/cd/E11035_01/wls100/jms/dds.html
- http://activemq.apache.org/how-do-i-configure-distributed-queues-or-topics.html
- https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_8.0.0/com.ibm.mq.pro.doc/q002660_.htm
9. Download the Source Code
This example builds two java applications to send and receive messages via the AMQ broker.
You can download the full source code of this example here: Apache ActiveMQ Distributed Queue Tutorial