Apache ActiveMQ Load Balancing Example
1. Introduction
Apache ActiveMQ (AMQ) is a message broker which transfers the message from the sender to the receiver. Load Balancing is the process of distributing data across services for better performance.
In this example, we will demonstrate how to build a load-balanced AMQ client application.
2. The Component Diagram
In this example, we will demonstrate two forms of load balancing outlined in the diagram:
- A message producer sends messages to multiple AMQ brokers
- Messages in a queue are consumed by multiple competing consumers
3. Technologies used
The example code in this article was built and run using:
- Java 1.8.101 (1.8.x will do fine)
- Maven 3.3.9 (3.3.x will do fine)
- Apache ActiveMQ 5.8.0 and 5.15.0 (others will do fine)
- Spring JMS 4.1.5.RELEASE (others will do fine)
- Eclipse Neon (Any Java IDE would work)
4. Start two ActiveMQ Brokers
4.1 Configure ActiveMQ with Non-Default Port
4.1.1 Update activemq.xml
Navigate to the ..\apache-activemq-5.8.0\conf
directory. Update the activemq.xml
file at the transportConnector
element.
activemq.xml transportConnectors
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61716?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors>
4.1.2 Update jetty.xml
Go to the ..\apache-activemq-5.8.0\conf
directory. Update the jetty.xml
file at the bean
element .
jetty.xml port
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> <property name="port" value="8761" /> </bean>
4.2 Start ActiveMQ Brokers
In this example, we will start two AMQ instances:
- Broker 1 – AMQ 5.15.0 at default port 61616/8161
- Broker 2 – AMQ 5.8.0 at port 61716/8761
Go to the ..\apache-activemq-5.x.0\bin
directory. Then click the activemq.bat
file.
If you can go to http://localhost:8161/admin/index.jsp
, then the broker 1 is started fine.
You do the same for broker 2 at http://localhost:8761/admin/index.jsp
.
5. Producer Load Balancing Example
In this example, we will demonstrate how to build MessageSender
which uses the Round-robin method to send messages to two AMQ brokers.
5.1 Dependency
Add dependency to Maven pom.xml.
pom.xml
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.0</version> </dependency> </dependencies>
5.2 Constants
There are five constants value used in this example.
DemoConstants
package jcg.demo.util; import java.util.Random; /** * The constant data used in this Demo * @author Mary.Zheng * */ public final class DemoConstants{ public static final int MESSAGE_SIZE = 100; public static final String PRODUCER_DESTINATION = "test.queue.lb.producer"; public static final String CONSUMER_DESTINATION = "test.queue.lb.consumer"; public static String BROKER_1_URI = "tcp://localhost:61616"; public static String BROKER_2_URI = "tcp://localhost:61716"; public static String buildDummyMessage() { Random rand = new Random(); int value = rand.nextInt(MESSAGE_SIZE); return "dummy message " + value; } }
5.3 Spring configuration
Add the JMS Spring configuration.
JmsConfig
package jcg.demo.spring.config; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import jcg.demo.spring.jms.component.JmsExceptionListener; import jcg.demo.util.DemoConstants; @Configuration @EnableJms @ComponentScan(basePackages = "jcg.demo.spring.jms.component") public class JmsConfig { @Bean @Autowired public ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) { return createJmsConnectionFactory(DemoConstants.BROKER_1_URI, jmsExceptionListener); } @Bean @Autowired public ConnectionFactory jmsConnectionFactory_2(JmsExceptionListener jmsExceptionListener) { return createJmsConnectionFactory(DemoConstants.BROKER_2_URI, jmsExceptionListener); } private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI); activeMQConnectionFactory.setExceptionListener(jmsExceptionListener); CachingConnectionFactory pooledConnection = new CachingConnectionFactory(activeMQConnectionFactory); return pooledConnection; } @Bean(name = "jmsQueueTemplate_1") @Autowired public JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) { return new JmsTemplate(jmsConnectionFactory); } @Bean(name = "jmsQueueTemplate_2") @Autowired public JmsTemplate createJmsQueueTemplate_2(ConnectionFactory jmsConnectionFactory_2) { return new JmsTemplate(jmsConnectionFactory_2); } }
- line 25: Create connection factory to broker 1
- line 31: Create connection factory to broker 2
- line 42: Create
JmsTemplate
to broker 1 - line 48: Create
JmsTemplate
to broker 2
5.4 MessageSender
Create MessageSender Spring component to send messages.
MessageSender
package jcg.demo.spring.jms.component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * This is Spring component which finds the load-balanced JmsTemplate via * Round-Robin from the list of available JmsQueueTemplates to send the message * * @author Mary.Zheng * */ @Component public class MessageSender { @Autowired private List jmsQueueTemplates = new ArrayList(); private AtomicInteger current = new AtomicInteger(0); private JmsTemplate findJmsTemplate_LB() { int cur = current.getAndIncrement(); int index = cur % jmsQueueTemplates.size(); System.out.println("\tFind Load balanced JmsTemplate[ " + index + " ]"); return jmsQueueTemplates.get(index); } public void postToQueue(final String queueName, final String message) { System.out.println("MessageSender postToQueue started"); this.findJmsTemplate_LB().send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
- line 25: Spring dependency injection will add both broker’s
JmsTemplate
tojmsQueueTemplates
- line 30-36: Use Round-robin logic to find the
JmsTemplate
- line 40: Send the message with load-balanced
JmsTemplate
5.5 MessageProducerApp
Create MessageProducer application.
MessageProducerApp
package jcg.demo.activemqlb.producer; import java.util.Scanner; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Configuration; import jcg.demo.spring.config.JmsConfig; import jcg.demo.spring.jms.component.MessageSender; import jcg.demo.util.DemoConstants; @Configuration public class MessageProducerApp { public static void main(String[] args) throws Exception { try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class)) { context.register(MessageProducerApp.class); String queueName = readDestination(); MessageSender springJmsProducer = (MessageSender) context.getBean("messageSender"); for (int i = 0; i < DemoConstants.MESSAGE_SIZE; i++) { springJmsProducer.postToQueue(queueName, DemoConstants.buildDummyMessage()); } } } private static String readDestination() { System.out.println("Enter Destination: P - Producer, C - Consumer : "); try (Scanner scanIn = new Scanner(System.in)) { String inputString = scanIn.nextLine(); scanIn.close(); if (inputString.equalsIgnoreCase("P")) { return DemoConstants.PRODUCER_DESTINATION; } return DemoConstants.CONSUMER_DESTINATION; } } }
- line 16: Start Spring context from
JmsConfig
- line 20: Get
messageSender
Spring bean
5.6 Execute MessageProducerApp
Below is the application output when you input P
on the prompt. Make sure both brokers are running.
Execution Output
Enter Destination: P - Producer, C - Consumer : P MessageSender postToQueue started Find Load balanced JmsTemplate[ 0 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 1 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 0 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 1 ] ......
As you see here, two JmsTemplates
take turns to send a total of 100 messages to their connected broker.
Go to http://localhost:8161/admin/queues.jsp
for broker 1 and http://localhost:8761/admin/queues.jsp
for broker 2. You should see that each broker has 50 pending messages at test.queue.lb.producer
.
6. Consumer Load Balancing Example
In this example, we will demonstrate how to build the MessageConsumerApp
which consumes the messages from a queue. We also show how to run two of them concurrently.
6.1 MessageConsumerWithPrefetch
AMQ brokers set default prefetch size 1000, so we have to set the prefetch size to 1 to allow two consumers consume messages concurrently.
MessageConsumerWithPrefetch
package jcg.demo.activemqlb.consumer; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * A simple message consumer which consumes the message from ActiveMQ Broker * with pre-fetch size set to 1 instead of default 1000. * * @author Mary.Zheng * */ public class MessageConsumerWithPrefetch implements MessageListener { private static final String JMS_PREFETCH_POLICY_ALL_1 = "?jms.prefetchPolicy.all=1"; private String activeMqBrokerUri; private String username; private String password; private String destinationName; public MessageConsumerWithPrefetch(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri + JMS_PREFETCH_POLICY_ALL_1; this.username = username; this.password = password; } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); setComsumer(session); connection.start(); System.out.println(String.format("MessageConsumerWithPrefetch Waiting for messages at %s from %s", destinationName, this.activeMqBrokerUri)); } private void setComsumer(Session session) throws JMSException { Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); } @Override public void onMessage(Message message) { String msg; try { msg = String.format("MessageConsumerWithPrefetch Received message [ %s ]", ((TextMessage) message).getText()); Thread.sleep(10000);// sleep for 10 seconds System.out.println(msg); } catch (JMSException | InterruptedException e) { e.printStackTrace(); } } public String getDestinationName() { return destinationName; } public void setDestinationName(String destinationName) { this.destinationName = destinationName; } }
- line 23, 31 : Set the AMQ
prefetchPolicy
6.2 MessageConsumerApp
Create MessageConsumerApp which consumes from the consumer queue based on the selected broker.
MessageConsumerApp
package jcg.demo.activemqlb.consumer; import java.util.Scanner; import javax.jms.JMSException; import jcg.demo.util.DemoConstants; public class MessageConsumerApp { public static void main(String[] args) { String brokerUri = readBrokerInstance(); consume_queue_with_prefetchsize(brokerUri); } private static void consume_queue_with_prefetchsize(String brokerUri) { MessageConsumerWithPrefetch queueMsgListener = new MessageConsumerWithPrefetch(brokerUri, "admin", "admin"); queueMsgListener.setDestinationName(DemoConstants.CONSUMER_DESTINATION); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } private static String readBrokerInstance() { System.out.println("MessageConsumerApp listens at Broker Instance ( 1 or 2 ): "); try (Scanner scanIn = new Scanner(System.in)) { String inputString = scanIn.nextLine(); scanIn.close(); if (inputString.equalsIgnoreCase("1")) { return DemoConstants.BROKER_1_URI; } return DemoConstants.BROKER_2_URI; } } }
6.3 Execute MessageConsumerApp in Eclipse
Starts the MessageConsumerApp
via Eclipse.
MessageConsumerApp Output
MessageConsumerApp listens at Broker Instance ( 1 or 2 ): 1 MessageConsumerWithPrefetch Waiting for messages at test.queue.lb.consumer from tcp://localhost:61616?jms.prefetchPolicy.all=1
6.4 Execute MessageConsumerApp via Jar command
First, export the MessageConsumerApp
as a jar: activemq-lb.jar
. Open the command prompt and enter the command java -jar activemq-lb.jar
.
MessageConsumerApp Output
C:\JDK8_CTLSS\Java Code Geek Examples>java -jar activemq-lb.jar MessageConsumerApp listens at Broker Instance ( 1 or 2 ): 1 MessageConsumerWithPrefetch Waiting for messages at test.queue.lb.consumer from tcp://localhost:61616?jms.prefetchPolicy.all=1
6.5 Summary
There are two consumer applications listening at test.queue.lb.consumer
after the steps 6.3 and 6.4,
Monitoring both output while executing the MessageProducerApp
built at step 5.5 to send 100 messages to test.queue.lb.consumer
. You should see both consumers are receiving the messages. The screenshot below shows both consumers consumed 25 messages from test.queue.lb.consumer
after all messages are processed.
6.6 Things to consider
The AMQ message is dispatched based on the First-In, First-Out(FIFO) algorithm. If the messages must be processed based on the order entered, then running the consumer concurrently must be planned accordingly to avoid an error. Please check out ActiveMQ tutorial for detail.
7. Conclusion
In this example, we built two Java AMQ client applications:
MessageProducerApp
sends the message to two AMQ brokers via Round-robin algorithm to reduce the data load at each AMQ Broker- Two
MessageConsumerApps
consume the messages from the same queue to reduce the data load at the AMQ queue
8. Download the Source Code
This example built two Java AMQ client applications (producer and consumer) to achieve the load balancing requirement.
You can download the full source code of this example here: Apache ActiveMQ Load Balancing Example