Home » Enterprise Java » jms » Apache ActiveMQ Distributed Queue Tutorial

About Mary Zheng

Mary Zheng
Mary has graduated from Mechanical Engineering department at ShangHai JiaoTong University. She also holds a Master degree in Computer Science from Webster University. During her studies she has been involved with a large number of projects ranging from programming and software engineering. She works as a senior Software Engineer in the telecommunications sector where she acts as a leader and works with others to design, implement, and monitor the software solution.

Apache ActiveMQ Distributed Queue Tutorial

Apache ActiveMQ (AMQ) is an open source messaging server written in Java, which implements JMS 1.1 specifications. In this article, I will demonstrate how to utilize a distributed queue within a group of AMQ brokers.
 
 
 
 
 
 
 
 
 

1. Introduction

Apache ActiveMQ (AMQ) is a message broker which transfers the messages from the sender to the receiver.

A distributed queue is a single unit of Java Message Service (JMS) queues that are accessible as a single, logical queue to a client. The members of the unit are usually distributed across multiple servers within a cluster, with each queue member belonging to a separate JMS server.

AMQ provides network connectors to connect AMQ servers as a cluster. In a network of AMQ servers, the messages in a queue at Broker A can be consumed by a client from a different broker.

In this example, I will demonstrate how a distributed queue works in AMQ brokers.

2. Apache ActiveMQ Server Installation

Follow these instructions to install an AMQ server. Then use the AMQ admin command: activemq-admin create ${brokerName} to create a server instance.

Click here for details.

3. Producer Java Application

3.1 MessageProducerApp

Create MessageProducerApp.

MessageProducerApp.java

package jcg.demo.activemq;

import jcg.demo.util.DataUtils;
import jcg.demo.util.InputData;

public class MessageProducerApp {

	public static void main(String[] args) {
		InputData brokerTestData = DataUtils.readTestData();

		if (brokerTestData == null) {
			System.out.println("Wrong input");
		} else {
			QueueMessageProducer queProducer = new QueueMessageProducer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN,
					DataUtils.ADMIN);
			queProducer.sendDummyMessages(brokerTestData.getQueueName());
		}
	}
}

3.2 QueueMessageProducer

Create QueueMessageProducer.

QueueMessageProducer.java

package jcg.demo.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import jcg.demo.util.DataUtils;

/**
 * A simple message producer which sends the message to ActiveMQ Broker 
 * 
 * @author Mary.Zheng
 *
 */
public class QueueMessageProducer {

	private String activeMqBrokerUri;
	private String username;
	private String password;

	public QueueMessageProducer(String activeMqBrokerUri, String username, String password) {
		super();
		this.activeMqBrokerUri = activeMqBrokerUri;
		this.username = username;
		this.password = password;
	}

	public void sendDummyMessages(String queueName) {
		System.out.println("QueueMessageProducer started " + this.activeMqBrokerUri);
		ConnectionFactory connFactory = null;
		Connection connection = null;
		Session session = null;
		MessageProducer msgProducer = null;
		try {
			connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
			connection = connFactory.createConnection();		
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			msgProducer = session.createProducer(session.createQueue(queueName));
			
			for (int i = 0; i < DataUtils.MESSAGE_SIZE; i++) {
				TextMessage textMessage = session.createTextMessage(DataUtils.buildDummyMessage(i));
				msgProducer.send(textMessage);	
				Thread.sleep(30000);
			}
			System.out.println("QueueMessageProducer completed");
		} catch (JMSException | InterruptedException e) {
			System.out.println("Caught exception: " + e.getMessage());
		} 
		try {
			if (msgProducer != null) {
				msgProducer.close();
			}
			if (session != null) {
				session.close();
			}
			if (connection != null) {
				connection.close();
			}
		} catch (Throwable ignore) {
		}
	}
}
  • Line 49: Sleep 30 seconds after sending a message for demonstrating a slow producer

3.3 Export MessageProducerApp as a Jar

Export MessageProducerApp as activemq-msgproducerApp.jar

4. Consumer Java Application

4.1 MessageConsumerApp

Create MessageConsumerApp.

MessageConsumerApp.java

package jcg.demo.activemq;

import javax.jms.JMSException;

import jcg.demo.util.DataUtils;
import jcg.demo.util.InputData;

public class MessageConsumerApp {

	public static void main(String[] args) {

		InputData brokerTestData = DataUtils.readTestData();
		if (brokerTestData == null) {
			System.out.println("Wrong input");
		} else {
			QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(brokerTestData.getBrokerUrl(), DataUtils.ADMIN,
					DataUtils.ADMIN);
			queueMsgListener.setDestinationName(brokerTestData.getQueueName());

			try {
				queueMsgListener.run();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}
  • Line 16: Test AMQ broker URL
  • Line 18: Test queue name

4.2 QueueMessageConsumer

Create QueueMessageConsumer.

QueueMessageConsumer.java

package jcg.demo.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

/**
 * A simple message consumer which consumes the message from ActiveMQ Broker
 * 
 * @author Mary.Zheng
 *
 */
public class QueueMessageConsumer implements MessageListener {

	private String activeMqBrokerUri;
	private String username;
	private String password;
	private String destinationName;

	public QueueMessageConsumer(String activeMqBrokerUri, String username, String password) {
		super();
		this.activeMqBrokerUri = activeMqBrokerUri;
		this.username = username;
		this.password = password;
	}

	public void run() throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
		Connection connection = factory.createConnection();
		connection.setClientID(getClientId());
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		Destination destination = session.createQueue(destinationName);

		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(this);

		System.out.println(String.format("QueueMessageConsumer Waiting for messages at queue='%s' broker='%s'",
				destinationName, this.activeMqBrokerUri));
	}

	@Override
	public void onMessage(Message message) {
		if (message instanceof ActiveMQTextMessage) {
			ActiveMQTextMessage amqMessage = (ActiveMQTextMessage) message;
			try {
				String msg = String.format("QueueMessageConsumer Received message [ %s ]", amqMessage.getText());
				System.out.println(msg);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		} else {
			System.out.println("QueueMessageConsumer Received non-text message");
		}
	}

	public String getDestinationName() {
		return destinationName;
	}

	public void setDestinationName(String destinationName) {
		this.destinationName = destinationName;
	}

	private String getClientId() {
		return "MzhengClient_" + destinationName + "_" + activeMqBrokerUri.replace("tcp://localhost:", "");
	}
}
  • Line 37: Set connection clientID
  • Line 74: Set client Id from the queue name and the broker port.

4.3 Common Utils

Create DataUtils.

DataUtils.java

package jcg.demo.util;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Scanner;

import org.springframework.util.StringUtils;

/**
 * The data utility used in this Demo
 * 
 * @author Mary.Zheng
 *
 */
public final class DataUtils {

	private static final String INPUT_PROMPT_1 = "Enter Broker URL(tcp://$host:$port): ";
	private static final String INPUT_PROMPT_2 = "Enter Queue Name: ";
	public static final int MESSAGE_SIZE = 10;
	public static final String ADMIN = "admin";

	public static String buildDummyMessage(int value) {
		DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
		LocalDateTime now = LocalDateTime.now();		
		
		return "dummy message [" + value + "], created at " + dtf.format(now);
	}

	public static InputData readTestData() {
		InputData testData = null;

		try (Scanner scanIn = new Scanner(System.in)) {		
			System.out.println(INPUT_PROMPT_1);
			String brokerUrl = scanIn.nextLine();

			System.out.println(INPUT_PROMPT_2);
			String queueName = scanIn.nextLine();
			
			if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(brokerUrl)) {
				return testData;
			}
			testData = new InputData( brokerUrl, queueName);
		}

		return testData;
	}
}
  • Line 26: Include the message born time in the message body for demonstrating purpose

Create InputData to hold the testing data.

InputData.java

package jcg.demo.util;

/**
 * The input data for this demo.
 * 
 * @author Mary.Zheng
 *
 */
public class InputData {

	private String brokerUrl;
	private String queueName;

	public InputData(String brokerUrl, String queueName) {
		super();
		this.brokerUrl = brokerUrl.trim();
		this.queueName = queueName.trim();
	}

	public String getBrokerUrl() {
		return brokerUrl;
	}

	public void setBrokerUrl(String brokerUrl) {
		this.brokerUrl = brokerUrl;
	}

	public String getQueueName() {
		return queueName;
	}

	public void setQueueName(String queueName) {
		this.queueName = queueName;
	}
}

4.4 Export MessageConsumerApp as a Jar

Export MessageConsumerApp as activemq-msgConsumerApp.jar

5. Distributed Queue in a Static Network of Brokers

In this example, Producer-1 sends messages to Queue.1 at Broker-1. Consumer-1 receives the messages from Queue.1 at Broker-3Queue.1 is the distributed queue. It’s useful when the producer and consumer applications cannot be in the same AMQ server.

Image below shows a distribute queue (queue.1) in Brokers-1 and Broker-3.

Figure 5 distributed queue 1

Figure 5 distributed queue 1

5.1 Configure a Static Network of Brokers

Configure a network of Broker-1 and Broker-3:

Broker nameHome PathOpenwire PortWeb PortData Path
broker-1..\cluster\broker-1618168861..\data
broker-3..\cluster\broker-3615165161\broker-3\data

Click here for the configuration details.

5.2 Verify the AMQ Brokers – Part I

Start both Broker-1 and Broker-3.

Navigate to the AMQ web console to view the connections details.

Image 5.2.1 Broker-1(8861) Connections

Figure Broker -1 connection

Figure 5.2.1 Broker-1 connection

Note: Broker-1 connection client name is defined at step 4.2.

Image 5.2.2 Broker-3 (5161) Connections

Figure Broker-3 connection

Figure 5.2.2 Broker-3 connection

Note: Broker-3 has a network connector to Broker-1.

5.3 Execute the Consumer Application

Enter java -jar activemq-msgConsumerApp.jar to start MessageConsumerApp.

MessageConsumerApp Output

C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar
Enter Broker URL(tcp://$host:$port):
tcp://localhost:61816
Enter Queue Name:
queue.1
QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61816'
  • Line 3: Enter Broker-1 URL
  • Line 5: Enter queue name queue.1

5.4 Execute the Publisher Application

While MessageConsumerApp is running, enter java -jar activemq-msgproducerApp to start MessageProducerApp.

MessageProducerApp output

C:\Users\shu.shan\Desktop>java -jar activemq-msgproducerApp.jar
Enter Broker URL(tcp://$host:$port):
tcp://localhost:61516
Enter Queue Name:
queue.1
QueueMessageProducer started tcp://localhost:61516
  • Line 3: Enter Broker-3 URL
  • Line 5: Enter queue name queue.1

Image below shows both applications are running.

Figure 5.5 Execution of Application

Figure 5.5 Execution of Application

5.5 Verify the AMQ Brokers – Part II

Navigate to Broker-1 web console, click queues to see queue.1, lastly, click on its active consumers link.

Image Below shows queue.1‘s active consumer – Mzhengclient-queue.1_61816 at broker-1.

Figure Broker-1 consumer

Figure 5.5.1. Broker-1 queue.1 consumer

Image Below shows queue.1‘s active consumer – nc:61516-61816_broker-1_inbound_broker-3 at broker-3.

Figure 5.5.2 Broker-3 Consumer

Figure 5.5.2 Broker-3 Consumer

Notequeue.1 is the distributed queue via the broker’s connect connector.

6. Distributed Queue in a Dynamic Network of Brokers

In this example, Producer-1 sends messages to queue.1 at Dynamic-Broker1Producer-2 also sends messages to queue.1 at Dynamic-Broker2Consumer-1 receives the messages from Queue.1 at Dynamic-Broker3Queue.1 is the distributed queue. It’s useful to share the load among multiple producers and support in-order delivery when processing the messages.

Diagram below shows a distributed queue(Queue.1) among three brokers.

Figure 6 distributed queue 2

Figure 6 distributed queue

6.1 Configure a Dynamic Network of Brokers

Configure a dynamic network of brokers with three brokers:

Broker nameHome PathOpenwire PortWeb PortData Path
dynamic-broker1..\cluster\dynamic-broker1616268166..\dynamic-broker1\data
dynamic-broker2..\cluster\dynamic-broker2616368164..\dynamic-broker2\data
dynamic-broker3..\cluster\dynamic-broker3616468165..\dynamic-broker3\data

Click here for the configuration details.

6.2 Verify the AMQ Brokers – Part I

Start all three dynamic-brokers. Navigate to the AMQ web console to view the connections details.

Image below shows Dynamic-broker1 (8166) connections.

Figure 6.2 Dynamic-Broker1 Connections

6.3 Execute the Consumer Application

Enter java -jar activemq-msgConsumerApp.jar to start MessageConsumerApp at Dynnamic-broker2.

MessageConsumerApp Output

C:\Users\shu.shan\Desktop>java -jar activemq-msgConsumerApp.jar
Enter Broker URL(tcp://$host:$port):
tcp://localhost:61636
Enter Queue Name:
queue.1
QueueMessageConsumer Waiting for messages at queue='queue.1' broker='tcp://localhost:61636'

6.4 Execute the Publisher Application

While MessageConsumerApp is running, enter java -jar activemq-msgproducerApp to start MessageProducerApp twice, one for Dynamic-broker1, the other for Dynamic-broker3.

Figure 6.4 Application Execution Output

Note: The consumer listens to queue.1 atDynamic-Broker2 while two publishers publish the messages to queue.1 at Dynamic-Broker1 and Dynamic-Broker3. The consumer processed the messages based on the message’s born time.

6.5 Verify the AMQ Brokers – Part II

Navigate to Dynamic-Broker2 web console, click queues to see queue.1, lastly, click on its active consumers link.

Image below shows queue.1‘s active consumer – Mzhengclient-queue.1_61636 at broker-3.

Figure 6.5.1 Broker-3 Consumer

Figure 6.5.1 Dynamic-Broker2 Consumer

Image below shows queue.1 at Dynamic-broker3 has two active consumers via the network of brokers.

Figure 6.5.2 Broker-2 consumer

Figure 6.5.2 Dynamic-Broker3 consumers

Notequeue.1 is the distributed queue via the broker’s connect connector.

7. Summary

In this article, I demonstrated two cases of a distributed queue by utilizing AMQ with a network of brokers. AMQ network of brokers also provides high availability to the client. Click here for more details on high availability.

Distributed Queue provides support for deliveries where subscribers receives messages in the same order they have been published. Besides Apache ActiveMQ, IBM MQRabbitMQ, HornetQ, and Apache Kafka also support Distributed Queues.

8. References

9. Download the Source Code

This example builds two java applications to send and receive messages via the AMQ broker.

Download
You can download the full source code of this example here: Apache ActiveMQ Distributed Queue Tutorial
(No Ratings Yet)
Start the discussion Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

Leave a Reply

avatar
  Subscribe  
Notify of