Apache ActiveMQ Advisory Example
In this example, we will be discussing about Apache ActiveMQ Advisory. But before we start with our example, it is expected that we have a basic understanding of JMS concepts, ActiveMQ and Java/J2EE. JMS stands for Java Messaging API and ActiveMQ is a java based implementation of JMS. ActiveMQ acts as a message broker. It is open source and helps in asynchronous message transfer between producer and consumer using queues/topics.
So, whenever some activity occurs on ActiveMQ , there is a course of action that can be taken to receive notifications using ActiveMQ Advisory messages. For example, we can setup notifications on producer, consumer or destination.
1. Introduction
ActiveMQ Advisory acts as an administrative channel to monitor our system via JMS messages. We can get information on what is happening with our producers, consumers and destination topics/queues. In this tutorial we will see how a consumer gets notified whenever a message is consumed. We will be using Java v8 and Active MQ v 5.14.5.
2. Configuration for ActiveMQ Advisory
- Download ActiveMQ from ActiveMQ download link.
- Extract the ActiveMQ downloaded zip file to any location in your computer.
- Goto conf directory of extracted activemq folder.
- Open activemq.xml and search for xml tag
policyEntry
. It will be like this:policyEntry topic=">"
- Change topic to queue (as we will be using queue in our example) and add
advisoryForDelivery
tag to setup advisory messages. The ‘>’ matches all queues.policyEntry queue=">" advisoryForDelivery="true"
- Now start activemq as described here Say Hello To ActiveMQ.
3. Using ActiveMQ Advisory
3.1 Example 1 – Using ActiveMQ Advisory in a simple Java project (example using Eclipse)
- In this example, we will see how ActiveMQ advisory works in a simple way. The listener method
onMessage()
gets triggerred whenever a message is received by the Receiver . - Let us now create a dynamic web project in eclipse and create our
Sender
andReceiver
classes to see how a message is exchanged using ActiveMQ. A notification will be received when a consumer consumes a message. - The
Sender
andReceiver
classes are created as separate threads and implements theRunnable
interface. - The
Receiver
class implementsMessageListener
and hence override theonMessage()
method. - The
main
method starts theReceiver
thread, sleep for some time and then starts theSender
thread. So the momentSender
starts sending messages, theonMessage()
method inReceiver
class gets invoked. - Please refer the code snippet below for our
Sender
andReceiver
classes.
Sender.java
package com.activemq.advisory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender implements Runnable{ //URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // default broker URL is : tcp://localhost:61616" private static String subject = "JCG_QUEUE"; // Queue Name.You can create any/many queue names as per your requirement. @Override public void run() { try { // Getting JMS connection from the server and starting it ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //Creating a non transactional session to send/receive JMS message. Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //Destination represents here our queue 'JCG_QUEUE' on the JMS server. //The queue will be created automatically on the server. Destination destination = session.createQueue(subject); // MessageProducer is used for sending messages to the queue. MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // We will be sending text messages for (int i = 0; i < 100; i++) { String text = "JCG Message Count " + i; TextMessage message = session.createTextMessage(text); // Here we are sending our message! producer.send(message); System.out.println("JCG printing Sent message: " + message); } session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught Exception: " + e); e.printStackTrace(); } } }
Receiver.java
package com.activemq.advisory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage; public class Receiver implements Runnable, ExceptionListener, MessageListener { // URL of the JMS server private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // default broker URL is : tcp://localhost:61616" // Name of the queue we will receive messages from private static String subject = "JCG_QUEUE"; public Receiver() { try { // Getting JMS connection from the server ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( url); Connection connection = activeMQConnectionFactory .createConnection(); connection.start(); connection.setExceptionListener(this); // Creating session for receiving messages Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Getting the queue 'JCG_QUEUE' Destination destination = session.createQueue(subject); // MessageConsumer is used for receiving (consuming) messages MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this);//Setting message listener } catch (Exception e) { System.out.println("Caught exception: " + e); e.printStackTrace(); } } public void run() { // Make Consumer a Daemon thread. while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void onException(JMSException ex) { System.out.println("JCG ActiveMQ JMS Exception occured. Shutting down client."); } public void onMessage(Message msg) { ActiveMQTextMessage tm = (ActiveMQTextMessage) msg; try { System.out.println(" JCG inside onMessage:: Received Message:::" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { (new Thread(new Receiver())).start(); Thread.sleep(10000); (new Thread(new Sender())).start(); } }
Output:
We will be running our Receiver
class which starts both Sender
and Receiver
threads written above to check how the onMessage()
method is invoked.
Please follow the steps below:
- In the eclipse Right Click on Receiver.java -> Run As->Java Application, to see if our message is sent to the queue and received by the Receiver class.
- Check the eclipse console output for the sent and received message. Whenever a message is received, the
onMessage()
method is invoked.
3. We can also check our ActiveMQ console->Queues tab, to see the number of Pending/Enqueued/Dequeued messages in our queue after running the program.
3.2 Example 2 – Monitoring ActiveMQ for a number of events (example using Eclipse)
- In this example we will use two classes
AdvisorySrc
andAdvisoryConsumer
. - Using
AdvisorySrc
, the broker will produce Advisory messages. AdvisoryConsumer
demonstrates how we can implement those broker advisories.- The ActiveMQ broker generates advisory messages for a number of different events that occur on the broker.
- The client applications can subscribe to special topics where the events are sent in order to monitor activity on the broker.
- The advisory messages are nothing but simple JMS message objects that have some properties to provide event related information.
- The
AdvisoryConsumer
class that we have written listens for events related toMessageProducer
andMessageConsumer
, being added and removed from the broker for a particular destination. - In this example, we will be watching a queue named JCG_QUEUE.
- Let us now create a dynamic web project in eclipse and create our
AdvisorySrc
andAdvisoryConsumer
classes. Please refer the code below.
AdvisorySrc.java
package com.activemq.advisory; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class AdvisorySrc implements Runnable { private final String connectionUri = "tcp://localhost:61616"; private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; private final Random rand = new Random(); public void run() { try { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("JCG_QUEUE"); TimeUnit.SECONDS.sleep(rand.nextInt(10)); MessageProducer producer = session.createProducer(destination); producer.send(session.createTextMessage()); TimeUnit.SECONDS.sleep(rand.nextInt(10)); MessageConsumer consumer = session.createConsumer(destination); consumer.receive(); TimeUnit.SECONDS.sleep(rand.nextInt(30)); System.out.print("."); if (connection != null) { connection.close(); } } catch (Exception ex) {} } public static void main(String[] args) { System.out.println("Starting Advisory Message Source !!!!"); try { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; ++i) { service.execute(new AdvisorySrc()); } service.shutdown(); service.awaitTermination(5, TimeUnit.MINUTES); System.out.println(); } catch (Exception e) { System.out.println(e.getMessage()); } System.out.println("Finished running the Advisory Message Source"); } }
AdvisoryConsumer.java
package com.activemq.advisory; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisorySupport; public class AdvisoryConsumer implements MessageListener { private final String connectionUri = "tcp://localhost:61616"; private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; private MessageConsumer advisoryConsumer; private Destination monitored; public void before() throws Exception { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); monitored = session.createQueue("JCG_QUEUE"); destination = session.createTopic( AdvisorySupport.getConsumerAdvisoryTopic(monitored).getPhysicalName() + "," + AdvisorySupport.getProducerAdvisoryTopic(monitored).getPhysicalName()); advisoryConsumer = session.createConsumer(destination); advisoryConsumer.setMessageListener(this); connection.start(); } public void after() throws Exception { if (connection != null) { connection.close(); } } public void onMessage(Message message) { try { Destination source = message.getJMSDestination(); if (source.equals(AdvisorySupport.getConsumerAdvisoryTopic(monitored))) { int consumerCount = message.getIntProperty("consumerCount"); System.out.println("New Consumer Advisory, Consumer Count: " + consumerCount); } else if (source.equals(AdvisorySupport.getProducerAdvisoryTopic(monitored))) { int producerCount = message.getIntProperty("producerCount"); System.out.println("New Producer Advisory, Producer Count: " + producerCount); } } catch (JMSException e) { } } public void run() throws Exception { TimeUnit.MINUTES.sleep(10); } public static void main(String[] args) { AdvisoryConsumer example = new AdvisoryConsumer(); System.out.println("Starting Advisory Consumer example now..."); try { example.before(); example.run(); example.after(); } catch (Exception e) { System.out.println("Caught an exception during the example: " + e.getMessage()); } System.out.println("Finished running the Advisory Consumer example."); } }
Output:
ActiveMQ must be running. Next, we will be running our AdvisorySrc
and AdvisoryConsumer
classes to see how the onMessage()
method is invoked.
Please follow the steps below:
- In the eclipse Right Click on AdvisoryConsumer.java -> Run As->Java Application. We can see the main method printing the message.
2. In the eclipse Right Click on AdvisorySrc.java -> Run As->Java Application. When you run AdvisorySrc
, check eclipse console for both AdvisorySrc
and AdvisoryConsumer
classes.We can see output like the following that indicates the application is receiving advisory messages from the broker.
As you see the AdvisoryConsumer
class, we are subscribing to two different topics on the broker (ActiveMQ). The topics for subscribing are ConsumerAdvisoryTopic and ProducerAdvisoryTopic. ActiveMQ client library provides a convenience class AdvisorySupport
for fetching the various advisory topics.
3. We can also check our ActiveMQ console->Queues tab and ActiveMQ console->Topics tab, to see the number of Pending/Enqueued/Dequeued messages in our queue and the topics our client has subscribed to, after running the program. Refer the screenshots below.
4. Conclusion
Through this example, we have learned how to configure ActiveMQ Advisory messages and use those messages as some kind of notification when a message is consumed by the consumer. We have also seen how a client application can subscribe to different topics on the broker.