JMS Client Example
The term “JMS client” refers to Java components or applications that use the JMS API and a JMS provider to send and receive messages.
JMS supports two styles of messaging: the point−to−point and publis−and−subscribe messaging styles. Before a client can use a JMS provider to send and receive messages, the client must decide which messaging style it wants to use.
A client can consume a message synchronously or asynchronously.
In this article, we will see several examples of JMS Clients.
1. Dependencies
In order to send and receive JMS messages to and from a JMS message broker, we need to include the message service library. In this example we are using activeMq so our pom.xml will have dependencies related to spring as well as activeMq.
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javacodegeeks.jms</groupId> <artifactId>springJmsQueue</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.12.0</version> </dependency> </dependencies> </project>
2. Starting the JMS Provider
JMS is a specification and not an actual product. A JMS provider such as ActiveMQ, IBM, Progress Software, or even Sun provides a messaging product that implements the specification. In our examples, we will be using ActiveMQ as JMS Provider. Getting started with ActiveMQ isn’t difficult. You simply need to start up the broker and make sure that it’s capable of accepting connections and sending messages.
In the below example, the broker is started as a server listening on port 61616. The JMS Clients willing to connect to the broker will be using the TCP protocol (tcp://localhost:61616). Since the broker and JMS clients are running in the same machine, we have used localhost.
BrokerLauncher:
package com.javacodegeeks.jms; import java.net.URI; import java.net.URISyntaxException; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; public class BrokerLauncher { public static void main(String[] args) throws URISyntaxException, Exception { BrokerService broker = BrokerFactory.createBroker(new URI( "broker:(tcp://localhost:61616)")); broker.start(); } }
Output:
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | PListStore:[C:\javacodegeeks_ws\jmsClientExample\activemq-data\localhost\tmp_storage] started INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsClientExample\activemq-data\localhost\KahaDB] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:15633 INFO | Recovery replayed 62 operations from the journal in 0.016 seconds. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-57384-1447857883439-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://127.0.0.1:61616 started INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-57384-1447857883439-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsClientExample\activemq-data\localhost\KahaDB only has 37428 mb of usable space - resetting to maximum available disk space: 37428 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsClientExample\activemq-data\localhost\tmp_storage only has 37428 mb of usable space - resetting to maximum available 37428 mb.
3. ConnectionFactory
For a client to interact with the JMS provider to needs to get hold of a connection to the broker and a connection represents a logical connection to the JMS provider. In order to obtain this connection each JMS provider provides a connection factory. There are two types of connection factories: one for point−to−point and another for publish−and−subscribe. Based on the desired messaging style, the client obtains the appropriate connection factory and connects to the JMS provider.
In case of ActiveMQ, it provides one ConnectionFactory and internally it implements both QueueConnectionFactory
and TopicConnectionFactory
.
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616");
4. JMS Producer Client
In our next sections, we will show how to create a connection factory, create a new connection and session, create message producers and consumers which we will then use to send and receive messages. First let’s look at a message producer client. We will use point−to−point messaging style.
We will first obtain a connection factory, which we will then use to create a connection.
// Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection();
Next, use the connection object to create a queue session.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("customerQueue");
The session object obtained is used to create a producer that will be used to send a message. When the producer is created, it is told which queue to send the messages to.
MessageProducer producer = session.createProducer(queue);
Next, we create messages and send in a loop.
for (int i = 0; i < 10; i++) { String payload = task + i; Message msg = session.createTextMessage(payload); System.out.println("Sending text '" + payload + "'"); producer.send(msg); }
Finally, we send message ‘END’ to indicate the client that we have sent the last message.
producer.send(session.createTextMessage("END"));
Finally, close the session and the connections.
try { ... session.close(); } finally { if (connection != null) { connection.close(); } }
JmsProducerClient:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsProducerQueueClient { public static void main(String[] args) throws URISyntaxException, Exception { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("customerQueue"); MessageProducer producer = session.createProducer(queue); String task = "Task"; for (int i = 0; i < 10; i++) { String payload = task + i; Message msg = session.createTextMessage(payload); System.out.println("Sending text '" + payload + "'"); producer.send(msg); } producer.send(session.createTextMessage("END")); session.close(); } finally { if (connection != null) { connection.close(); } } } }
Output:
Sending text 'Task0' Sending text 'Task1' Sending text 'Task2' Sending text 'Task3' Sending text 'Task4' Sending text 'Task5' Sending text 'Task6' Sending text 'Task7' Sending text 'Task8' Sending text 'Task9'
5. JMS Consumer Client
The consumer is very similar to the producer client. It also needs a connection factory, the connection, the session, and the same queue. In this client program, however, the session is used to create a consumer instead of a producer. This consumer is told which queue to consume messages from when it is created.
// Consumer MessageConsumer consumer = session.createConsumer(queue); connection.start();
To actually receive a message, the client calls the receive method as follows:
while (true) { TextMessage textMsg = (TextMessage) consumer.receive(); System.out.println(textMsg); System.out.println("Received: " + textMsg.getText()); if (textMsg.getText().equals("END")) { break; } }
As you can see the consumer receives in an infinite loop. If it receives ‘END’, it comes out of the loop. The receive method can be used in several ways to perform a synchronous receive. If you specify no arguments or an argument of 0, the method blocks indefinitely until a message arrives.
JmsSyncReceiveClientExample:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsSyncReceiveClientExample { public static void main(String[] args) throws URISyntaxException, Exception { Connection connection = null; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { Queue queue = session.createQueue("customerQueue"); // Consumer MessageConsumer consumer = session.createConsumer(queue); connection.start(); while (true) { TextMessage textMsg = (TextMessage) consumer.receive(); System.out.println(textMsg); System.out.println("Received: " + textMsg.getText()); if (textMsg.getText().equals("END")) { break; } } } finally { if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } }
Output:
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931210, arrival = 0, brokerInTime = 1447857931212, brokerOutTime = 1447857977960, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6e3c1e69, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task0} Received: Task0 ActiveMQTextMessage {commandId = 6, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931222, arrival = 0, brokerInTime = 1447857931222, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1888ff2c, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task1} Received: Task1 ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931224, arrival = 0, brokerInTime = 1447857931225, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@35851384, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task2} Received: Task2 ActiveMQTextMessage {commandId = 8, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931227, arrival = 0, brokerInTime = 1447857931227, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@649d209a, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task3} Received: Task3 ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931230, arrival = 0, brokerInTime = 1447857931230, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6adca536, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task4} Received: Task4 ActiveMQTextMessage {commandId = 10, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:6, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931232, arrival = 0, brokerInTime = 1447857931233, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@357246de, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task5} Received: Task5 ActiveMQTextMessage {commandId = 11, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:7, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931235, arrival = 0, brokerInTime = 1447857931235, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@28f67ac7, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task6} Received: Task6 ActiveMQTextMessage {commandId = 12, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:8, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931238, arrival = 0, brokerInTime = 1447857931238, brokerOutTime = 1447857977961, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@256216b3, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task7} Received: Task7 ActiveMQTextMessage {commandId = 13, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:9, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931240, arrival = 0, brokerInTime = 1447857931241, brokerOutTime = 1447857977962, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2a18f23c, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task8} Received: Task8 ActiveMQTextMessage {commandId = 14, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:10, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931243, arrival = 0, brokerInTime = 1447857931243, brokerOutTime = 1447857977962, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@d7b1517, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task9} Received: Task9 ActiveMQTextMessage {commandId = 15, responseRequired = true, messageId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1:11, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-57438-1447857931037-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447857931246, arrival = 0, brokerInTime = 1447857931246, brokerOutTime = 1447857977962, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@16c0663d, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = END} Received: END
6. JMS Asynchronous Client Example
This section describes how to consume messages asynchronously. It uses a message listener in order to consume messages asynchronously.
In order to make sure the asynchronous consumer doesn’t run indefinitely, it calls countDown()
on latch when the message received is ‘END’.
ConsumerMessageListener:
package com.javacodegeeks.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { private String consumerName; private JmsAsyncReceiveQueueClientExample asyncReceiveQueueClientExample; public ConsumerMessageListener(String consumerName) { this.consumerName = consumerName; } public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(consumerName + " received " + textMessage.getText()); if ("END".equals(textMessage.getText())) { asyncReceiveQueueClientExample.latchCountDown(); } } catch (JMSException e) { e.printStackTrace(); } } public void setAsyncReceiveQueueClientExample( JmsAsyncReceiveQueueClientExample asyncReceiveQueueClientExample) { this.asyncReceiveQueueClientExample = asyncReceiveQueueClientExample; } }
Just like the synchronous client, it also creates a Connection
, a Session
, a MessageConsumer
and creates an instance of the MessageListener
class and then registers it as the message listener for the MessageConsumer
.
MessageConsumer consumer = session.createConsumer(queue); ConsumerMessageListener consumerListener = new ConsumerMessageListener( "Customer"); consumer.setMessageListener(consumerListener);
JmsAsyncReceiveQueueClientExample:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsAsyncReceiveQueueClientExample { private CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws URISyntaxException, Exception { JmsAsyncReceiveQueueClientExample asyncReceiveClient = new JmsAsyncReceiveQueueClientExample(); asyncReceiveClient.receiveMessages(); } public void receiveMessages() throws JMSException, InterruptedException { Connection connection = null; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { Queue queue = session.createQueue("customerQueue"); // Consumer MessageConsumer consumer = session.createConsumer(queue); ConsumerMessageListener consumerListener = new ConsumerMessageListener( "Customer"); consumer.setMessageListener(consumerListener); consumerListener.setAsyncReceiveQueueClientExample(this); connection.start(); latch.await(); } finally { if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public void latchCountDown() { latch.countDown(); } }
Output:
Customer received Task0 Customer received Task1 Customer received Task2 Customer received Task3 Customer received Task4 Customer received Task5 Customer received Task6 Customer received Task7 Customer received Task8 Customer received Task9 Customer received END
7. JMS Subscriber Client
Let’s now look into a client that uses publish−and−subscribe message style. It’s not very different than using the point−to−point style. It also needs a connection factory, connection and a session.
When a publisher sends a message, there may be more than one customer interested in such messages. Publisher broadcasts the message to JMS destination called ‘topic’. There may be more than one consumer subscribed to the topic. All the active clients subscribed to the topic will receive message and there is no need for the subscriber to poll for the messages. Every active subscriber receives its own copy of each message published to the topic. In this example, we will look into durable subscriber.
So what is a durable subscriber?
When a publisher publishes messages for an inactive subscriber, the messages are persisted and delivered when the subscriber reconnects. For durable subscribers to a topic, each consumer gets a copy of the message. While a durable subscriber is disconnected from the JMS server, it is the responsibility of the server to store messages the subscriber misses. When the durable subscriber reconnects, the message server sends it all the unexpired messages that accumulated.
The createDurableSubscriber()
method takes two parameters: a topic name, and a subscription name. A durable subscription’s uniqueness is defined by the client ID and the subscription name.
JmsSubscriberClientExample:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsSubscriberClientExample { public static void main(String[] args) throws URISyntaxException, Exception { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection.setClientID("DurabilityTest"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("customerTopic"); // Publish String payload = "Task"; TextMessage msg = session.createTextMessage(payload); MessageProducer publisher = session.createProducer(topic); System.out.println("Sending text '" + payload + "'"); publisher.send(msg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); // Consumer1 subscribes to customerTopic MessageConsumer consumer1 = session.createDurableSubscriber(topic, "consumer1", "", false); // Consumer2 subscribes to customerTopic MessageConsumer consumer2 = session.createDurableSubscriber(topic, "consumer2", "", false); connection.start(); msg = (TextMessage) consumer1.receive(); System.out.println("Consumer1 receives " + msg.getText()); msg = (TextMessage) consumer2.receive(); System.out.println("Consumer2 receives " + msg.getText()); session.close(); } finally { if (connection != null) { connection.close(); } } } }
Output:
Sending text 'Task' Consumer1 receives Task Consumer2 receives Task
8. Client Program for the Queue Browser
We have seen examples of synchronous and asynchronous client. We have also seen clients using point-to-point and publisher-subscriber style of messaging. Let’s now see how to view the messages sent to a consumer without actually consuming them.
In this example, we will show you how to browse through the pending messages in the queue using QueueBrowser
object. Since we can browse through the messages without actually consuming them, this is very unique and important feature to point-to-point messaging. We create the QueueBrowser object using the below statement on session object.
QueueBrowser browser = session.createBrowser(queue);
As you can see createBrowser()
takes the Queue object that we are interested to browse. In order to enumerate through the messages, we will call QueueBrowser.getEnumeration()
. When we are done with the browser we should close it using <codeQueueBrowser.close().
In the below example, we create a producer and post a bunch of messages to a queue. Next we create a consumer. In order to browse, we create a QueueBrowser object and navigate through the messages. Finally, we call consumer.receive()
to receive one of the messages from queue.
JmsBrowseQueueClientExample:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsBrowseQueueClientExample { public static void main(String[] args) throws URISyntaxException, Exception { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("customerQueue"); MessageConsumer consumer = session.createConsumer(queue); connection.start(); System.out.println("Browse through the elements in queue"); QueueBrowser browser = session.createBrowser(queue); Enumeration e = browser.getEnumeration(); while (e.hasMoreElements()) { TextMessage message = (TextMessage) e.nextElement(); System.out.println("Get [" + message.getText() + "]"); } System.out.println("Done"); browser.close(); TextMessage textMsg = (TextMessage) consumer.receive(); System.out.println(textMsg); System.out.println("Received: " + textMsg.getText()); session.close(); } finally { if (connection != null) { connection.close(); } } } }
Output:
Browse through the elements in queue Get [Task0] Get [Task1] Get [Task2] Get [Task3] Get [Task4] Get [Task5] Get [Task6] Get [Task7] Get [Task8] Get [Task9] Get [END] Done ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:INMAA1-L1005-58212-1447859579333-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-58212-1447859579333-1:1:1:1, destination = queue://customerQueue, transactionId = null, expiration = 0, timestamp = 1447859579480, arrival = 0, brokerInTime = 1447859579481, brokerOutTime = 1447859586255, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@28864e92, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Task0} Received: Task0
9. JMS Client Acknowledge
Suppose each message that a JMS provider delivers to a consumer need not be acknowledged and is left up to the client consuming the messages to decide when to acknowledge, then we need to create as session by passing in Session.CLIENT_ACKNOWLEDGE
as the second parameter.
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
If a message is not acknowledged it may be redelivered to the consumer by the JMS provider. The client acknowledges a message by calling the acknowledge method on it.
Message.acknowledge();
Acknowledging one message actually acknowledges all messages that the session has consumed.
JmsClientAckExample:
package com.javacodegeeks.jms; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsClientAckExample { public static void main(String[] args) throws URISyntaxException, Exception { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost?broker.persistent=false"); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("SomeQueue"); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Hello")); MessageConsumer consumer = session.createConsumer(queue); TextMessage msg = (TextMessage) consumer.receive(1000); System.out.println("Consume: " + msg.getText()); // Reset the session. session.close(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Attempt to Consume the message... consumer = session.createConsumer(queue); msg = (TextMessage) consumer.receive(1000); System.out.println("Attempt to consume again. Is message received? " + (msg != null)); //acknowledge msg.acknowledge(); // Reset the session. session.close(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Attempt to Consume the message... consumer = session.createConsumer(queue); msg = (TextMessage) consumer.receive(1000); System.out.println("Attempt to consume again. Is message received? " + (msg != null)); } finally { if (connection != null) { connection.close(); } } } }
Output:
INFO | Using Persistence Adapter: MemoryPersistenceAdapter WARN | Failed to start JMX connector Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is java.rmi.AlreadyBoundException: jmxrmi]. Will restart management to re-create JMX connector, trying to remedy this issue. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53531-1447910509729-0:1) is starting WARN | Failed to start JMX connector Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is java.rmi.AlreadyBoundException: jmxrmi]. Will restart management to re-create JMX connector, trying to remedy this issue. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53531-1447910509729-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsClientExample\activemq-data\localhost\tmp_storage only has 37178 mb of usable space - resetting to maximum available 37178 mb. INFO | Connector vm://localhost started Consume: Hello Attempt to consume again. Is message received? true Attempt to consume again. Is message received? false INFO | Connector vm://localhost stopped INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53531-1447910509729-0:1) is shutting down INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53531-1447910509729-0:1) uptime 1.471 seconds INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53531-1447910509729-0:1) is shutdown
10. Download the Eclipse Project
This was an example about JMS Clients.
You can download the full source code of this example here: jmsClientExample.zip