JMS Topic Example
When a publisher sends a message, there may be more than one customer interested in such messages. Publisher broadcasts the message to JMS destination called ‘topic’. There may be more than one consumer subscribed to the topic.
All the active clients subscribed to the topic will receive message and there is no need for the subscriber to poll for the messages. Every active subscriber receives its own copy of each message published to the topic.
In this article, we will see some examples of JMS Topic.
1. Dependencies
In order to send and receive JMS messages to and from a JMS message broker, we need to include the message service library. In this example we are using activeMq so our pom.xml will have dependencies related to spring as well as activeMq.
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javacodegeeks.jms</groupId> <artifactId>springJmsQueue</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.12.0</version> </dependency> </dependencies> </project>
2. Creating a Topic
Our first example consists of a publisher sending a message to a topic. We have two consumers subscribed to the topic. Both the consumers have registered a listener to consume messages from topic in an asynchronous way.
If you notice, the consumers first subscribe to the topic and then the publisher publishes the message, this is because the consumers are not durable so they must be online.
JmsTopicExample:
package com.javacodegeeks.jms; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; public class JmsTopicExample { public static void main(String[] args) throws URISyntaxException, Exception { BrokerService broker = BrokerFactory.createBroker(new URI( "broker:(tcp://localhost:61616)")); broker.start(); Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("customerTopic"); // Consumer1 subscribes to customerTopic MessageConsumer consumer1 = session.createConsumer(topic); consumer1.setMessageListener(new ConsumerMessageListener("Consumer1")); // Consumer2 subscribes to customerTopic MessageConsumer consumer2 = session.createConsumer(topic); consumer2.setMessageListener(new ConsumerMessageListener("Consumer2")); connection.start(); // Publish String payload = "Important Task"; Message msg = session.createTextMessage(payload); MessageProducer producer = session.createProducer(topic); System.out.println("Sending text '" + payload + "'"); producer.send(msg); Thread.sleep(3000); session.close(); } finally { if (connection != null) { connection.close(); } broker.stop(); } } }
3. Consuming messages from Topic using MessageListener
The message listener implements MessageListener
. The constructor takes in the consumer name so we know which consumer is consuming the message.
ConsumerMessageListener:
package com.javacodegeeks.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { private String consumerName; public ConsumerMessageListener(String consumerName) { this.consumerName = consumerName; } public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(consumerName + " received " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
Now lets’ run our example with the above message listener.
Output:
INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] started INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:295312 INFO | Recovery replayed 1 operations from the journal in 0.013 seconds. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-56907-1447237470288-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://127.0.0.1:61616 started INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-56907-1447237470288-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB only has 29578 mb of usable space - resetting to maximum available disk space: 29579 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage only has 29578 mb of usable space - resetting to maximum available 29578 mb. Sending text 'Important Task' Consumer1 received Important Task Consumer2 received Important Task INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-56907-1447237470288-0:1) is shutting down INFO | Connector tcp://127.0.0.1:61616 stopped INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] stopped INFO | Stopping async queue tasks INFO | Stopping async topic tasks INFO | Stopped KahaDB INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-56907-1447237470288-0:1) uptime 3.917 seconds INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-56907-1447237470288-0:1) is shutdown
The same example can be re-written using topic specific objects.
4. Topic Specific APIs
In this example, we will use Topic specific APIs. We use in this example below topic specific objects.
- TopicConnectionFactory
- TopicConnection
- TopicSession
- TopicPublisher
JmsTopicConnectionExample:
package com.javacodegeeks.jms; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; public class JmsTopicConnectionExample { public static void main(String[] args) throws URISyntaxException, Exception { BrokerService broker = BrokerFactory.createBroker(new URI( "broker:(tcp://localhost:61616)")); broker.setPersistent(true); broker.start(); TopicConnection topicConnection = null; try { // Producer TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); topicConnection = connectionFactory.createTopicConnection(); topicConnection.setClientID("JMSTOPIC"); TopicSession topicConsumerSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); Topic topic = topicConsumerSession.createTopic("customerTopic"); // Consumer1 subscribes to customerTopic MessageConsumer consumer1 = topicConsumerSession.createSubscriber(topic); consumer1.setMessageListener(new ConsumerMessageListener( "Consumer1")); // Consumer2 subscribes to customerTopic MessageConsumer consumer2 = topicConsumerSession.createSubscriber(topic); consumer2.setMessageListener(new ConsumerMessageListener( "Consumer2")); topicConnection.start(); // Publish TopicSession topicPublisherSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); String payload = "Important Task"; Message msg = topicPublisherSession.createTextMessage(payload); TopicPublisher publisher = topicPublisherSession.createPublisher(topic); System.out.println("Sending text '" + payload + "'"); publisher.publish(msg); Thread.sleep(3000); topicPublisherSession.close(); topicConsumerSession.close(); } finally { if (topicConnection != null) { topicConnection.close(); } broker.stop(); } } }
Output:
INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] started INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:344608 INFO | Recovery replayed 1 operations from the journal in 0.014 seconds. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-61121-1447245012885-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://127.0.0.1:61616 started INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-61121-1447245012885-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB only has 29598 mb of usable space - resetting to maximum available disk space: 29598 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage only has 29598 mb of usable space - resetting to maximum available 29598 mb. Sending text 'Important Task' Consumer1 received Important Task Consumer2 received Important Task INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-61121-1447245012885-0:1) is shutting down INFO | Connector tcp://127.0.0.1:61616 stopped INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] stopped INFO | Stopping async queue tasks INFO | Stopping async topic tasks INFO | Stopped KahaDB INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-61121-1447245012885-0:1) uptime 3.915 seconds INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-61121-1447245012885-0:1) is shutdown
5. Durable Subscriber
Durable subscribers can receive already published messages when they are back online. A durable subscription saves messages for an inactive subscriber and delivers these saved messages when the subscriber reconnects. For durable subscribers to a topic, each consumer gets a copy of the message.
While a durable subscriber is disconnected from the JMS server, it is the responsibility of the server to store messages the subscriber misses. When the durable subscriber reconnects, the message server sends it all the unexpired messages that accumulated.
The createDurableSubscriber()
method takes two parameters: a topic name, and a subscription name. A durable subscription’s uniqueness is defined by the client ID and the subscription name.
JmsDurableSubscriberExample:
package com.javacodegeeks.jms; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; public class JmsDurableSubscriberExample { public static void main(String[] args) throws URISyntaxException, Exception { BrokerService broker = BrokerFactory.createBroker(new URI( "broker:(tcp://localhost:61616)")); broker.start(); Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection.setClientID("DurabilityTest"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("customerTopic"); // Publish String payload = "Important Task"; TextMessage msg = session.createTextMessage(payload); MessageProducer publisher = session.createProducer(topic); System.out.println("Sending text '" + payload + "'"); publisher.send(msg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); // Consumer1 subscribes to customerTopic MessageConsumer consumer1 = session.createDurableSubscriber(topic, "consumer1", "", false); // Consumer2 subscribes to customerTopic MessageConsumer consumer2 = session.createDurableSubscriber(topic, "consumer2", "", false); connection.start(); msg = (TextMessage) consumer1.receive(); System.out.println("Consumer1 receives " + msg.getText()); msg = (TextMessage) consumer2.receive(); System.out.println("Consumer2 receives " + msg.getText()); session.close(); } finally { if (connection != null) { connection.close(); } broker.stop(); } } }
Output:
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] started INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:385534 INFO | Recovery replayed 1 operations from the journal in 0.012 seconds. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53592-1447268104050-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://127.0.0.1:61616 started INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53592-1447268104050-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB only has 29545 mb of usable space - resetting to maximum available disk space: 29546 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage only has 29545 mb of usable space - resetting to maximum available 29545 mb. Sending text 'Important Task' Consumer1 receives Important Task Consumer2 receives Important Task INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53592-1447268104050-0:1) is shutting down INFO | Connector tcp://127.0.0.1:61616 stopped INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] stopped INFO | Stopping async queue tasks INFO | Stopping async topic tasks INFO | Stopped KahaDB INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53592-1447268104050-0:1) uptime 0.913 seconds INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-53592-1447268104050-0:1) is shutdown
6. Publishing Message Using Persistent Mode
In the above example, the publisher publishes the message using the persistent delivery mode:
publisher.send(msg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
Note the use of the overloaded publish( ) method, with parameters that specify delivery mode, priority, and message expiration.
7. MessageConsumer.receive()
We receive the message using MessageConsumer.receive()
method rather than passively receiving it through the onMessage()
callback. The default behavior of the receive()
method is to block program execution until a message is retrieved from the message server. You can always sepcify a timeout though. The receive()
method follows “pull” model instead of “push” model which is the way of onMessage()
. From the client’s perspective, you can think of this as a polling mechanism.
8. Temporary Topics
Temporary topics are a mechanism for JMS clients to create topics dynamically. Its lifetime will be that of the Connection
unless it is deleted earlier.
A temporary topic is a topic that is dynamically created by the JMS provider, using the createTemporaryTopic()
method of the TopicSession
object. A temporary topic is associated with the connection that belongs to the TopicSession that created it. It is only active for the duration of the connection, and it is guaranteed to be unique across all connections. Since it is temporary, it can’t be durable: it lasts only as long as its associated client connection is active. The topic identity is transferred using the JMSReplyTo
header.
9. Request and Reply
In the below example, the server connection is listening for queries on ‘SomeTopic’. Client sends a request query to server’s ‘SomeTopic’ and expects an answer from it on a temporary topic created dynamically just for this connection. It sets the temporary topic in JMSReplyTo
. Server receives the message and sends response to topic found in JMSReplyTo
. The client then receives the response using MessageConsumer.receive(5000)
.
JmsTopicExample:
package com.javacodegeeks.jms; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; public class JmsTopicExample implements MessageListener { private Session serverSession; private MessageProducer replyProducer; public static void main(String[] args) throws URISyntaxException, Exception { JmsTopicExample jmsTopicExample = new JmsTopicExample(); jmsTopicExample.sendReqOnTempTopic(); } public void sendReqOnTempTopic() throws URISyntaxException, Exception { BrokerService broker = BrokerFactory.createBroker(new URI( "broker:(tcp://localhost:61616)")); broker.start(); Connection serverConnection = null; Connection clientConnection = null; try { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); serverConnection = connectionFactory.createConnection(); serverConnection.setClientID("serverTempTopic"); serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); replyProducer = serverSession.createProducer(null); Topic requestDestination = serverSession.createTopic("SomeTopic"); //Server is listening for queries final MessageConsumer requestConsumer = serverSession .createConsumer(requestDestination); requestConsumer.setMessageListener(this); serverConnection.start(); // Client sends a query to topic 'SomeTopic' clientConnection = connectionFactory.createConnection(); clientConnection.setClientID("clientTempTopic"); Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); clientConnection.start(); Destination replyDestination = clientSession.createTemporaryTopic(); MessageProducer requestProducer = clientSession.createProducer(requestDestination); //Create a client listener on the temporary topic MessageConsumer replyConsumer = clientSession.createConsumer(replyDestination); TextMessage requestMessage = clientSession.createTextMessage("Client: Important Query"); //Server is going to send the reply to the temporary topic requestMessage.setJMSReplyTo(replyDestination); requestProducer.send(requestMessage); System.out.println("Sent request " + requestMessage.toString()); //Read the answer from temporary queue. Message msg = replyConsumer.receive(5000); TextMessage replyMessage = (TextMessage)msg; System.out.println("Received reply " + replyMessage.toString()); System.out.println("Received answer: " + replyMessage.getText()); replyConsumer.close(); clientSession.close(); serverSession.close(); } finally { if (clientConnection != null) { clientConnection.close(); } if (serverConnection != null) { serverConnection.close(); } broker.stop(); } } //Server receives a query and sends reply to temporary topic set in JMSReplyTo public void onMessage(Message message) { try { TextMessage requestMessage = (TextMessage)message; System.out.println("Received request." + requestMessage.toString()); Destination replyDestination = requestMessage.getJMSReplyTo(); TextMessage replyMessage = serverSession.createTextMessage("Server: This is my answer to " + requestMessage.getText()); replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); replyProducer = serverSession.createProducer(replyDestination); replyProducer.send(replyMessage); System.out.println("Sent reply."); System.out.println(replyMessage.toString()); } catch (JMSException e) { System.out.println(e); } } }
Output:
INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] started INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB] INFO | KahaDB is version 6 INFO | Recovering from the journal @1:379942 INFO | Recovery replayed 1 operations from the journal in 0.015 seconds. INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-51557-1447266194175-0:1) is starting INFO | Listening for connections at: tcp://127.0.0.1:61616 INFO | Connector tcp://127.0.0.1:61616 started INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-51557-1447266194175-0:1) started INFO | For help or more information please see: http://activemq.apache.org WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\KahaDB only has 29549 mb of usable space - resetting to maximum available disk space: 29550 mb WARN | Temporary Store limit is 51200 mb, whilst the temporary data directory: C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage only has 29549 mb of usable space - resetting to maximum available 29549 mb. Sent request ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:INMAA1-L1005-51557-1447266194175-3:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://SomeTopic, transactionId = null, expiration = 0, timestamp = 1447266194427, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = temp-topic://ID:INMAA1-L1005-51557-1447266194175-3:2:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Client: Important Query} Received request.ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:INMAA1-L1005-51557-1447266194175-3:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-51557-1447266194175-3:2:1:1, destination = topic://SomeTopic, transactionId = null, expiration = 0, timestamp = 1447266194427, arrival = 0, brokerInTime = 1447266194427, brokerOutTime = 1447266194427, correlationId = null, replyTo = temp-topic://ID:INMAA1-L1005-51557-1447266194175-3:2:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5a2f3f16, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Client: Important Query} Sent reply. ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:INMAA1-L1005-51557-1447266194175-3:1:1:2:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = temp-topic://ID:INMAA1-L1005-51557-1447266194175-3:2:1, transactionId = null, expiration = 0, timestamp = 1447266194430, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = ID:INMAA1-L1005-51557-1447266194175-3:2:1:1:1, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Server: This is my answer to Client: Important Query} Received reply ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:INMAA1-L1005-51557-1447266194175-3:1:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:INMAA1-L1005-51557-1447266194175-3:1:1:2, destination = temp-topic://ID:INMAA1-L1005-51557-1447266194175-3:2:1, transactionId = null, expiration = 0, timestamp = 1447266194430, arrival = 0, brokerInTime = 1447266194430, brokerOutTime = 1447266194430, correlationId = ID:INMAA1-L1005-51557-1447266194175-3:2:1:1:1, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@491666ad, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Server: This is my answer to Client: Important Query} Received answer: Server: This is my answer to Client: Important Query INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-51557-1447266194175-0:1) is shutting down INFO | Connector tcp://127.0.0.1:61616 stopped INFO | PListStore:[C:\javacodegeeks_ws\jmsTopicExample\activemq-data\localhost\tmp_storage] stopped INFO | Stopping async queue tasks INFO | Stopping async topic tasks INFO | Stopped KahaDB INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-51557-1447266194175-0:1) uptime 0.971 seconds INFO | Apache ActiveMQ 5.12.0 (localhost, ID:INMAA1-L1005-51557-1447266194175-0:1) is shutdown
10. Download the Eclipse Project
This was an example about JMS Topic.
You can download the full source code of this example here: jmsTopicExample.zip
When I use MessageConsumer it didn’t work with topics, only with queues. When I used TopicSubscriber it did work.
can we create 2 connection with publisher , becoz i run publisher andagain when run publisher it give error already “javax.jms.InvalidClientIDException: Broker: localhost – Client: AthenaPublisher already connected from tcp://127.0.0.1:44448”
how could i resolve this