jms

Apache ActiveMQ Advisory Example

In this example, we will be discussing about Apache ActiveMQ Advisory. But before we start with our example, it is expected that we have a basic understanding of JMS concepts, ActiveMQ and Java/J2EE. JMS stands for Java Messaging API and ActiveMQ is a java based implementation of JMS. ActiveMQ acts as a message broker. It is open source and helps in asynchronous message transfer between producer and consumer using queues/topics.

So, whenever some activity occurs on ActiveMQ , there is a course of action that can be taken to receive notifications using ActiveMQ Advisory messages. For example, we can setup notifications on producer, consumer or destination.

1. Introduction

ActiveMQ Advisory acts as an administrative channel to monitor our system via JMS messages. We can get information on what is happening with our producers, consumers and destination topics/queues. In this tutorial we will see how a consumer gets notified whenever a message is consumed. We will be using Java v8 and Active MQ v 5.14.5.

2. Configuration for ActiveMQ Advisory

  1. Download ActiveMQ from ActiveMQ download link.
  2. Extract the ActiveMQ downloaded zip file to any location in your computer.
  3. Goto conf directory of extracted activemq folder.
  4. Open activemq.xml and search for xml tag policyEntry . It will be like this:
    policyEntry topic=">"
  5. Change topic to queue (as we will be using queue in our example) and add advisoryForDelivery tag to setup advisory messages. The ‘>’ matches all queues.
    policyEntry queue=">" advisoryForDelivery="true"
  6. Now start activemq as described here Say Hello To ActiveMQ.

3. Using ActiveMQ Advisory

3.1 Example 1 – Using ActiveMQ Advisory in a simple Java project (example using Eclipse)

  1. In this example, we will see how ActiveMQ advisory works in a simple way. The listener method onMessage() gets triggerred whenever a message is received by the Receiver .
  2. Let us now create a dynamic web project in eclipse and create our Sender and Receiver classes to see how a message is exchanged using ActiveMQ. A notification will be received when a consumer consumes a message.
  3. The Sender and Receiver classes are created as separate threads and implements the Runnable interface.
  4. The Receiver class implements MessageListener and hence override the onMessage() method.
  5. The main method starts the Receiver thread, sleep for some time and then starts the Sender thread. So the moment Sender starts sending messages, the onMessage() method in Receiver class gets invoked.
  6. Please refer the code snippet below for our Sender and Receiver classes.

Sender.java

package com.activemq.advisory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender implements Runnable{

	//URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
	private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
		
	// default broker URL is : tcp://localhost:61616"
	private static String subject = "JCG_QUEUE"; // Queue Name.You can create any/many queue names as per your requirement.	
		
	@Override
	public void run() {
		try {
			// Getting JMS connection from the server and starting it
			ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
			Connection connection = activeMQConnectionFactory.createConnection();
			connection.start();

			//Creating a non transactional session to send/receive JMS message.
			Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

			//Destination represents here our queue 'JCG_QUEUE' on the JMS server. 
			//The queue will be created automatically on the server.
			Destination destination = session.createQueue(subject);

			// MessageProducer is used for sending messages to the queue.
			MessageProducer producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			// We will be sending text messages
			for (int i = 0; i < 100; i++) {
				String text = "JCG Message Count " + i;
				TextMessage message = session.createTextMessage(text);
				// Here we are sending our message!
				producer.send(message);
				System.out.println("JCG printing Sent message: " + message);				
			}
			session.close();
			connection.close();
		} catch (Exception e) {
			System.out.println("Caught Exception: " + e);
			e.printStackTrace();
		}
	}
}

Receiver.java

package com.activemq.advisory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

public class Receiver implements Runnable, ExceptionListener, MessageListener {

	// URL of the JMS server
	private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	// default broker URL is : tcp://localhost:61616"

	// Name of the queue we will receive messages from
	private static String subject = "JCG_QUEUE";

	public Receiver() {

		try {
			// Getting JMS connection from the server
			ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
					url);
			Connection connection = activeMQConnectionFactory
					.createConnection();
			connection.start();

			connection.setExceptionListener(this);

			// Creating session for receiving messages
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);

			// Getting the queue 'JCG_QUEUE'
			Destination destination = session.createQueue(subject);

			// MessageConsumer is used for receiving (consuming) messages
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(this);//Setting message listener
		} catch (Exception e) {
			System.out.println("Caught exception: " + e);
			e.printStackTrace();
		}
	}

	public void run() {
		// Make Consumer a Daemon thread.
		while (true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public synchronized void onException(JMSException ex) {
		System.out.println("JCG ActiveMQ JMS Exception occured.  Shutting down client.");
	}

	public void onMessage(Message msg) {
		ActiveMQTextMessage tm = (ActiveMQTextMessage) msg;
		try {
			System.out.println(" JCG inside onMessage:: Received Message:::" + tm.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		(new Thread(new Receiver())).start();
		Thread.sleep(10000);
		(new Thread(new Sender())).start();
	}

}

Output:

We will be running our Receiver class which starts both Sender and Receiver threads written above to check how the onMessage() method  is invoked.

Please follow the steps below:

  1. In the eclipse Right Click on Receiver.java -> Run As->Java Application, to see if our message is sent to the queue and received by the Receiver class.
  2. Check the eclipse console output for the sent and received message. Whenever a message is received, the onMessage() method is invoked.

Fig 1: Eclipse console showing onMessage() being called

3. We can also check our ActiveMQ console->Queues tab, to see the number of Pending/Enqueued/Dequeued messages in our queue after running the program.

Fig 2: ActiveMQ Console

3.2 Example 2 – Monitoring ActiveMQ for a number of events (example using Eclipse)

  1. In this example we will use two classes AdvisorySrc and AdvisoryConsumer.
  2. Using AdvisorySrc, the broker will produce Advisory messages.
  3. AdvisoryConsumer demonstrates how we can implement those broker advisories.
  4. The ActiveMQ broker generates advisory messages for a number of different events that occur on the broker.
  5. The client applications can subscribe to special topics where the events are sent in order to monitor activity on the broker.
  6. The advisory messages are nothing but simple JMS message objects that have some properties to provide event related information.
  7. The AdvisoryConsumer class that we have written listens for events related to MessageProducer and MessageConsumer, being added and removed from the broker for a particular destination.
  8. In this example, we will be watching a queue named JCG_QUEUE.
  9. Let us now create a dynamic web project in eclipse and create our AdvisorySrc and AdvisoryConsumer classes. Please refer the code below.

AdvisorySrc.java

package com.activemq.advisory;

import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
 
import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
 
import org.apache.activemq.ActiveMQConnectionFactory; 
 
public class AdvisorySrc implements Runnable { 
 
    private final String connectionUri = "tcp://localhost:61616"; 
    private ActiveMQConnectionFactory connectionFactory; 
    private Connection connection; 
    private Session session; 
    private Destination destination; 
    private final Random rand = new Random(); 
 
    public void run() { 
        try { 
            connectionFactory = new ActiveMQConnectionFactory(connectionUri); 
            connection = connectionFactory.createConnection(); 
            connection.start(); 
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
            destination = session.createQueue("JCG_QUEUE"); 
 
            TimeUnit.SECONDS.sleep(rand.nextInt(10)); 
 
            MessageProducer producer = session.createProducer(destination); 
            producer.send(session.createTextMessage()); 
 
            TimeUnit.SECONDS.sleep(rand.nextInt(10)); 
 
            MessageConsumer consumer = session.createConsumer(destination); 
            consumer.receive(); 
 
            TimeUnit.SECONDS.sleep(rand.nextInt(30)); 
 
            System.out.print("."); 
 
            if (connection != null) { 
                connection.close(); 
            } 
 
        } catch (Exception ex) {} 
    } 
 
    public static void main(String[] args) { 
        System.out.println("Starting Advisory Message Source !!!!"); 
        try {  
            ExecutorService service = Executors.newFixedThreadPool(10); 
            for (int i = 0; i < 20; ++i) { 
                service.execute(new AdvisorySrc()); 
            }  
            service.shutdown(); 
            service.awaitTermination(5, TimeUnit.MINUTES); 
            System.out.println(); 
 
        } catch (Exception e) { 
            System.out.println(e.getMessage()); 
        } 
        System.out.println("Finished running the Advisory Message Source"); 
    } 
}

AdvisoryConsumer.java

package com.activemq.advisory;

import java.util.concurrent.TimeUnit; 
 
import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.advisory.AdvisorySupport; 
 
public class AdvisoryConsumer implements MessageListener { 
 
    private final String connectionUri = "tcp://localhost:61616"; 
    private ActiveMQConnectionFactory connectionFactory; 
    private Connection connection; 
    private Session session; 
    private Destination destination; 
    private MessageConsumer advisoryConsumer; 
    private Destination monitored; 
 
    public void before() throws Exception { 
        connectionFactory = new ActiveMQConnectionFactory(connectionUri); 
        connection = connectionFactory.createConnection(); 
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        monitored = session.createQueue("JCG_QUEUE"); 
        destination = session.createTopic( 
            AdvisorySupport.getConsumerAdvisoryTopic(monitored).getPhysicalName() + "," + 
            AdvisorySupport.getProducerAdvisoryTopic(monitored).getPhysicalName()); 
        advisoryConsumer = session.createConsumer(destination); 
        advisoryConsumer.setMessageListener(this); 
        connection.start(); 
    } 
 
    public void after() throws Exception { 
        if (connection != null) { 
            connection.close(); 
        } 
    } 
 
    public void onMessage(Message message) { 
        try { 
            Destination source = message.getJMSDestination(); 
            if (source.equals(AdvisorySupport.getConsumerAdvisoryTopic(monitored))) { 
                int consumerCount = message.getIntProperty("consumerCount"); 
                System.out.println("New Consumer Advisory, Consumer Count: " + consumerCount); 
            } else if (source.equals(AdvisorySupport.getProducerAdvisoryTopic(monitored))) { 
                int producerCount = message.getIntProperty("producerCount"); 
                System.out.println("New Producer Advisory, Producer Count: " + producerCount); 
            } 
        } catch (JMSException e) { 
        } 
    } 
 
    public void run() throws Exception { 
        TimeUnit.MINUTES.sleep(10); 
    } 
 
    public static void main(String[] args) { 
        AdvisoryConsumer example = new AdvisoryConsumer(); 
        System.out.println("Starting Advisory Consumer example now..."); 
        try { 
            example.before(); 
            example.run(); 
            example.after(); 
        } catch (Exception e) { 
            System.out.println("Caught an exception during the example: " + e.getMessage()); 
        } 
        System.out.println("Finished running the Advisory Consumer example."); 
    } 
}

Output:

ActiveMQ must be running. Next, we will be running our AdvisorySrc and AdvisoryConsumer classes to see how the onMessage() method is invoked.

Please follow the steps below:

  1. In the eclipse Right Click on AdvisoryConsumer.java -> Run As->Java Application. We can see the main method printing the message.

Fig 3: Eclipse output console

2. In the eclipse Right Click on AdvisorySrc.java -> Run As->Java Application. When you run AdvisorySrc, check eclipse console for both AdvisorySrc and AdvisoryConsumer classes.We can see output like the following that indicates the application is receiving advisory messages from the broker.

Fig 4: Eclipse console showing appln receiving msgs from broker

 

Fig 5: Eclipse console for AdvisorySrc.java

As you see the AdvisoryConsumer class, we are subscribing to two different topics on the broker (ActiveMQ). The topics for subscribing are ConsumerAdvisoryTopic and ProducerAdvisoryTopic. ActiveMQ client library provides a convenience class AdvisorySupport for fetching the various advisory topics.

3. We can also check our ActiveMQ console->Queues tab and ActiveMQ console->Topics tab, to see the number of Pending/Enqueued/Dequeued messages in our queue and the topics our client has subscribed to, after running the program. Refer the screenshots below.

Fig 6: ActiveMQ Queues tab

 

Fig 7: ActiveMQ Topics subscribed

4. Conclusion

Through this example, we have learned how to configure ActiveMQ Advisory messages and use those messages as some kind of notification when a message is consumed by the consumer. We have also seen how a client application can subscribe to different topics on the broker.

Neha Goel

Neha holds a Bachelors degree in Computer Science and Engineering. Currently she is working as a Sr. Programmer Analyst for a client in USA and has a total of 9+ years of Java/J2EE experience.Her expertise includes participation in all stages of SDLC. She has experience with multiple web based and enterprise based applications. She has a very impressive understanding in Object oriented architecture, analysis, design and software development using latest technologies like Java, J2EE , Restful Services, Spring, Hibernate, JDBC, JSP, Servlets, GWT, ATG etc.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button