jms

JMS MessageListener Example

A message can be sent either through point-to-point or publish-and-subscribe messaging styles. In both the ways, the producer and consumer work in a decoupled manner. The message is sent to a named destination. There are two ways a client can receive messages from a message consumer:

  1. The message consumer used one of its receive() methods to call the blocking receive() method with no parameters or poll the consumer by calling either the receive(timeOut) method with a time out.
  2. The other way is to register a message listener object with a message consumer by calling the setMessageListener() method and passing in the message listener as a parameter.

In this article, we will see what is a MessageListener and how we can use it to receive messages.

1. Dependencies

In order to send and receive JMS messages to and from a JMS message broker, we need to include the message service library. In this example we are using activeMq so our pom.xml will have dependencies related to spring as well as activeMQ.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javacodegeeks.jms</groupId>
	<artifactId>springJmsQueue</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.12.0</version>
		</dependency>
	</dependencies>
	
</project>

2. What is a Message Listener?

A message listener is an object that implements the MessageListener interface shown below:

public interface MessageListener {
    void onMessage(Message message);
}

As soon as messages arrives at the destination, the message consumer delivers them by calling the message listener’s onMessage( method. Registering a message listener allows clients to
asynchronously receive messages without having to block/poll the message consumer.

3. Registering a MessageListener

Messages are received by a message consumer. Let’s look at MessageConsumer interface:

public interface MessageConsumer {
    String getMessageSelector() throws JMSException;
    MessageListener getMessageListener() throws JMSException;
    void setMessageListener(MessageListener listener) throws JMSException;
    Message receive() throws JMSException;
    Message receive(long timeout) throws JMSException;
    Message receiveNoWait() throws JMSException;
    void close() throws JMSException;
}

Instead of waiting/polling the message consumer for messages, a client can register a message listener with a message consumer using setMessageListener(). Once a message arrives at the destination, registered message listener’s onMessage() will be called. Method getMessageListener() will return the registered message listener.

4. MessageListener throwing RuntimeException

As messages arrives to the destination, the JMS provider delivers them by calling the listener’s onMessage() callback method. Note that the onMessage() method is declared as not throwing any exceptions i.e. it does not have a throws clause. It is still possible for a listener to throw a RuntimeException. One should try not throw a RuntimeException and instead catch and deal with the exceptions.

5. Message Listener Queue Example

We will first implement MessageListener. In onMessage(Message) method, we will retrieve the payload from the message and simply print the text.

ConsumerMessageListener:

package com.javacodegeeks.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener {
	private String consumerName;

	public ConsumerMessageListener(String consumerName) {
		this.consumerName = consumerName;
	}

	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage) message;
		try {
			System.out.println(consumerName + " received "
					+ textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

In the below example, we create a message, send it to a destination and then consume the message from destination queue using a message listener.

  1. We first create a bridge.
    BrokerService broker = BrokerFactory.createBroker(new URI(
    				"broker:(tcp://localhost:61616)"));
    
  2. Start the bridge.
    broker.start();
    
  3. Create connection factory.
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    					"tcp://localhost:61616");
    
  4. Create connection.
    connection = connectionFactory.createConnection();
    
  5. Create a session.
    Session session = connection.createSession(false,
    					Session.AUTO_ACKNOWLEDGE);
    
  6. Create a queue.
    Queue queue = session.createQueue("customerQueue");
    
  7. Create a producer for the queue.
    MessageProducer producer = session.createProducer(queue);
  8. Create a message.
    Message msg = session.createTextMessage(payload);
    
  9. Use the producer to send it to the destination.
    producer.send(msg);
    
  10. Create a consumer to receive the message from the queue.
    MessageConsumer consumer = session.createConsumer(queue);
    
  11. Register a message listener to receive the messages from the destination.
    consumer.setMessageListener(new ConsumerMessageListener("Consumer"));
    

JmsMessageListenerExample:

package com.javacodegeeks.jms;

import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

public class JmsMessageListenerExample {
	public static void main(String[] args) throws URISyntaxException, Exception {
		BrokerService broker = BrokerFactory.createBroker(new URI(
				"broker:(tcp://localhost:61616)"));
		broker.start();
		Connection connection = null;
		try {
			// Producer
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					"tcp://localhost:61616");
			connection = connectionFactory.createConnection();
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Queue queue = session.createQueue("customerQueue");
			String payload = "Important Task";
			Message msg = session.createTextMessage(payload);
			MessageProducer producer = session.createProducer(queue);
			System.out.println("Sending text '" + payload + "'");
			producer.send(msg);

			// Consumer
			MessageConsumer consumer = session.createConsumer(queue);
			consumer.setMessageListener(new ConsumerMessageListener("Consumer"));
			connection.start();
			Thread.sleep(1000);
			session.close();
		} finally {
			if (connection != null) {
				connection.close();
			}
			broker.stop();
		}
	}

}

Output:

 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB]
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:303724
 INFO | Recovery replayed 1 operations from the journal in 0.013 seconds.
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-55835-1449298775816-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61616
 INFO | Connector tcp://127.0.0.1:61616 started
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-55835-1449298775816-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB only has 30135 mb of usable space - resetting to maximum available disk space: 30136 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage only has 30135 mb of usable space - resetting to maximum available 30135 mb.
Sending text 'Important Task'
Consumer received Important Task
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-55835-1449298775816-0:1) is shutting down
 INFO | Connector tcp://127.0.0.1:61616 stopped
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] stopped
 INFO | Stopping async queue tasks
 INFO | Stopping async topic tasks
 INFO | Stopped KahaDB
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-55835-1449298775816-0:1) uptime 1.956 seconds
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-55835-1449298775816-0:1) is shutdown

 6. Message Listener Topic Example

Message listener can also be used in case of topic where multiple customers are subscribed to receive the messages. In the below example, we have more than one consumer listening on the same topic.

// Consumer1 subscribes to customerTopic
MessageConsumer consumer1 = session.createConsumer(topic);
consumer1.setMessageListener(new ConsumerMessageListener("Consumer1"));
			
// Consumer2 subscribes to customerTopic
MessageConsumer consumer2 = session.createConsumer(topic);
consumer2.setMessageListener(new ConsumerMessageListener("Consumer2"));

We create individual message listeners for both the consumers and register. Moment the topic receives a message, it is consumed by both the consumers.

JmsTopicMessageListenerExample:

package com.javacodegeeks.jms;

import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

public class JmsTopicMessageListenerExample {
	public static void main(String[] args) throws URISyntaxException, Exception {
		BrokerService broker = BrokerFactory.createBroker(new URI(
				"broker:(tcp://localhost:61616)"));
		broker.start();
		Connection clientConnection = null;
		try {
			// Producer
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					"tcp://localhost:61616");
			clientConnection = connectionFactory.createConnection();
			clientConnection.setClientID("TempTopicTest");
			Session session = clientConnection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Topic topic = session.createTemporaryTopic();	
			
			// Consumer1 subscribes to customerTopic
			MessageConsumer consumer1 = session.createConsumer(topic);
			consumer1.setMessageListener(new ConsumerMessageListener("Consumer1"));
			
			// Consumer2 subscribes to customerTopic
		    MessageConsumer consumer2 = session.createConsumer(topic);
		    consumer2.setMessageListener(new ConsumerMessageListener("Consumer2"));
		    
			clientConnection.start();		
			
			// Publish
			String payload = "Important Task";
			Message msg = session.createTextMessage(payload);
			MessageProducer producer = session.createProducer(topic);
			System.out.println("Sending text '" + payload + "'");
			producer.send(msg);
			
			Thread.sleep(3000);
			session.close();
		} finally {
			if (clientConnection != null) {
				clientConnection.close();
			}
			broker.stop();
		}
	}
}

You can see from the output both consumer1 and consumer2 receives the message.

Output:

 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB]
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:894464
 INFO | Recovery replayed 1 operations from the journal in 0.017 seconds.
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-60035-1449369035758-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61616
 INFO | Connector tcp://127.0.0.1:61616 started
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-60035-1449369035758-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB only has 29486 mb of usable space - resetting to maximum available disk space: 29486 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage only has 29486 mb of usable space - resetting to maximum available 29486 mb.
Sending text 'Important Task'
Consumer1 received Important Task
Consumer2 received Important Task
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-60035-1449369035758-0:1) is shutting down
 INFO | Connector tcp://127.0.0.1:61616 stopped
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] stopped
 INFO | Stopping async queue tasks
 INFO | Stopping async topic tasks
 INFO | Stopped KahaDB
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-60035-1449369035758-0:1) uptime 3.905 seconds
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-60035-1449369035758-0:1) is shutdown

7. MessageListener Acknowledging Receive

If the consumer has chosen CLIENT_ACKNOWLEDGE then the JMS client has to explicitly acknowledge each message it receives by calling Message.acknowledge() method. If a message is not acknowledged, the JMS provider may redeliver it to the consumer.

Our listener contains a boolean which if true, client will send a message acknowledgement.

AckMessageListener:

package com.javacodegeeks.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

public class AckMessageListener implements MessageListener {
	private boolean acknowledge;

	public AckMessageListener(boolean acknowledge) {
		this.acknowledge = acknowledge;
	}

	public void onMessage(Message message) {
		if (acknowledge) {
			try {
				message.acknowledge();
			} catch (JMSException e1) {
				e1.printStackTrace();
			}
		}
		System.out.println(message);
	}
}

In our example, a session is created with CLIENT_ACKNOWLEDGE flag. We will have two different message listeners. One which doesn’t acknowledges and the other which acknowledges. We do this using a flag in message listener. Next we send messages twice. Once for the message listener which doesn’t acknowledge and the second time for a message listener which acknowledges after receiving the message.

JmsMessageListenerAckExample:

package com.javacodegeeks.jms;

import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

public class JmsMessageListenerAckExample {
	public static void main(String[] args) throws URISyntaxException, Exception {
		BrokerService broker = BrokerFactory.createBroker(new URI(
				"broker:(tcp://localhost:61616)"));
		broker.start();
		Connection connection = null;
		try {
			// Producer
			ConnectionFactory factory = new ActiveMQConnectionFactory(
					"vm://localhost?broker.persistent=false&marshal=true");
			connection = factory.createConnection();
			Session session = connection.createSession(false,
					Session.CLIENT_ACKNOWLEDGE);
			Queue queue = session.createQueue("customerQueue");
			String payload = "Important Task";
			Message msg = session.createTextMessage(payload);
			MessageProducer producer = session.createProducer(queue);

			System.out.println("Sending text '" + payload + "'");
			producer.send(msg);
			
			// Consumer
			MessageConsumer consumer = session.createConsumer(queue);
			consumer.setMessageListener(
					new AckMessageListener(false));
			connection.start();		
			
			Thread.sleep(1000);
			
			System.out.println("Change the message listener to acknowledge");
			System.out.println("Sending text '" + payload + "'");
			producer.send(msg);
			consumer.setMessageListener(
					new AckMessageListener(true));
			
			Thread.sleep(1000);
			session.close();
		} finally {
			if (connection != null) {
				connection.close();
			}
			broker.stop();
		}
	}

}

In the first case, the listener doesn’t acknowledge. Since the session acknowledgement flag is set to CLIENT_ACKNOWLEDGE, we keep receiving the messages. In the second case, the listener acknowledges and the listener doesn’t receive any extra messages.

Output:

 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB]
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:856810
 INFO | Recovery replayed 1 operations from the journal in 0.014 seconds.
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59393-1449368400227-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61616
 INFO | Connector tcp://127.0.0.1:61616 started
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59393-1449368400227-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB only has 29487 mb of usable space - resetting to maximum available disk space: 29488 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage only has 29487 mb of usable space - resetting to maximum available 29487 mb.
 INFO | Connector vm://localhost started
Sending text 'Important Task'
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-59298-1449368307831-3:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-59298-1449368307831-3:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1449368308060, arrival = 0, brokerInTime = 1449368308061, brokerOutTime = 1449368400459, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@7ae541be, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Important Task}
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-59311-1449368320599-3:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-59311-1449368320599-3:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1449368320812, arrival = 0, brokerInTime = 1449368320813, brokerOutTime = 1449368400459, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@47066e51, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Important Task}
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-59393-1449368400227-3:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-59393-1449368400227-3:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1449368400438, arrival = 0, brokerInTime = 1449368400439, brokerOutTime = 1449368400459, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@d4fb4f9, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Important Task}
Change the message listener to acknowledge
Sending text 'Important Task'
ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:INMAA1-L1005-59393-1449368400227-3:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-59393-1449368400227-3:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1449368401458, arrival = 0, brokerInTime = 1449368401458, brokerOutTime = 1449368401471, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@25fd3290, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Important Task}
 INFO | Connector vm://localhost stopped
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59393-1449368400227-0:1) is shutting down
 INFO | Connector tcp://127.0.0.1:61616 stopped
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] stopped
 INFO | Stopping async queue tasks
 INFO | Stopping async topic tasks
 INFO | Stopped KahaDB
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59393-1449368400227-0:1) uptime 2.901 seconds
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59393-1449368400227-0:1) is shutdown

 8. Message Listener Receiving Multiple messages

In this example, we first create multiple consumers with message listener registered. Next we create a producer to sends multiple messages to a queue. Since it is a queue, the messages will be consumed only once by the consumer client in a round robin way.

JmsMultipleCustomersMessageListenerExample:

package com.javacodegeeks.jms;

import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

public class JmsMultipleCustomersMessageListenerExample {
	public static void main(String[] args) throws URISyntaxException, Exception {
		BrokerService broker = BrokerFactory.createBroker(new URI(
				"broker:(tcp://localhost:61616)"));
		broker.start();
		Connection connection = null;
		try {
			// Producer
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					"tcp://localhost:61616");
			connection = connectionFactory.createConnection();
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Queue queue = session.createQueue("customerQueue");

			// Consumer
			for (int i = 0; i < 4; i++) {
				MessageConsumer consumer = session.createConsumer(queue);
				consumer.setMessageListener(new ConsumerMessageListener(
						"Consumer " + i));
			}
			connection.start();

			String basePayload = "Important Task";
			MessageProducer producer = session.createProducer(queue);
			for (int i = 0; i < 10; i++) {
				String payload = basePayload + i;
				Message msg = session.createTextMessage(payload);
				System.out.println("Sending text '" + payload + "'");
				producer.send(msg);
			}

			Thread.sleep(1000);
			session.close();
		} finally {
			if (connection != null) {
				connection.close();
			}
			broker.stop();
		}
	}

}

You can see from below the messages sent to the queue and the order in which consumers receive messages.

Output:

 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB]
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:868386
 INFO | Recovery replayed 1 operations from the journal in 0.014 seconds.
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59678-1449368684497-0:1) is starting
 INFO | Listening for connections at: tcp://127.0.0.1:61616
 INFO | Connector tcp://127.0.0.1:61616 started
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59678-1449368684497-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\KahaDB only has 29487 mb of usable space - resetting to maximum available disk space: 29488 mb
 WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage only has 29487 mb of usable space - resetting to maximum available 29487 mb.
Sending text 'Important Task0'
Consumer 0 received Important Task0
Sending text 'Important Task1'
Consumer 1 received Important Task1
Sending text 'Important Task2'
Consumer 2 received Important Task2
Sending text 'Important Task3'
Consumer 3 received Important Task3
Sending text 'Important Task4'
Consumer 0 received Important Task4
Sending text 'Important Task5'
Consumer 1 received Important Task5
Sending text 'Important Task6'
Consumer 2 received Important Task6
Sending text 'Important Task7'
Consumer 3 received Important Task7
Sending text 'Important Task8'
Consumer 0 received Important Task8
Sending text 'Important Task9'
Consumer 1 received Important Task9
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59678-1449368684497-0:1) is shutting down
 INFO | Connector tcp://127.0.0.1:61616 stopped
 INFO | PListStore:[C:\javacodegeeks_ws\jmsMessageListenerExample\activemq-data\localhost\tmp_storage] stopped
 INFO | Stopping async queue tasks
 INFO | Stopping async topic tasks
 INFO | Stopped KahaDB
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59678-1449368684497-0:1) uptime 1.924 seconds
 INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-59678-1449368684497-0:1) is shutdown

9. Download the Eclipse Project

This was an example about JMS MessageListener.

Download
You can download the full source code of this example here: jmsMessageListenerExample.zip

Ram Mokkapaty

Ram holds a master's degree in Machine Design from IT B.H.U. His expertise lies in test driven development and re-factoring. He is passionate about open source technologies and actively blogs on various java and open-source technologies like spring. He works as a principal Engineer in the logistics domain.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Natasha
Natasha
6 years ago

Nice and concise tutorial!!

Samuel
Samuel
4 years ago

Thank you

Back to top button