Apache ActiveMQ File Transfer Example
1. Introduction
Apache ActiveMQ (AMQ) is a message broker which transfers messages from sender to receiver.
In this example, I will build two simple AMQ applications which will transfer files from one location to another:
- A producer sends a file via
BytesMessage
orBlobMessage
- A consumer receives the
BytesMessage
orBlobMessage
and saves it as a file
2. JMS Message Type
JMS defines six different message types.
AMQ has deprecated SteamMessage and added BlobMessage.
Message types:
Message Type | Contents | Purpose |
---|---|---|
TextMessage | A java.lang.String object | Exchanges simple text messages. such as XML and Json |
MapMessage | A set of name-value pairs, with names as String objects and values as primitive types in the Java programming language. | Exchanges key-value data |
ObjectMessage | A Serializable object in the Java programming language. | Exchanges Java objects. |
StreamMessage | A stream of primitive values in the Java programming language, filled and read sequentially. | Deprecated within AMQ. |
BytesMessage | A stream of uninterpreted bytes. This message type is for literally encoding a body to match an existing message format. | Exchanges data in a format that is native to the application, and when JMS is used as a transport between two systems, where the JMS client does not know the message payload type. |
BlobMessage | Binary Large Object (BLOB). | Added by AMQ. |
In this example, I will demonstrate how to transfer a file via BytesMessage
and BlobMessage
.
3. Business Use Case
Businesses exchange information by transferring files from one location to another.
In this example, it transfers nine files from C:\temp\input
to C:\temp\output
. These nine files belong to commonly used file types.
The image below shows file details as well as the output directory details.
4. File Transfer Application
4.1. Technologies Used
The example code in this article was built and run using:
- Java 1.8.101 (1.8.x will do fine)
- Maven 3.3.9 (3.3.x will do fine)
- Apache ActiveMQ 5.15.0 (others will do fine)
- Eclipse Neon (Any Java IDE would work)
4.2. Dependency
Add dependency to Maven pom.xml.
pom.xml
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> </dependencies>
4.3. Constants
There are eight constants value used in this example.
Constants.java
package jcg.demo; /** * The constants for this demo. * * @author Mary.Zheng * */ public class Constants { public static final String FILE_INPUT_DIRECTORY = "C:\\temp\\input"; public static final String FILE_NAME = "fileName"; public static final String FILE_OUTPUT_BYTE_DIRECTORY = "C:\\temp\\output\\bytes\\"; public static final String FILE_OUTPUT_BLOB_DIRECTORY = "C:\\temp\\output\\blob\\"; public static final String TEST_QUEUE = "test.queue"; public static final String TEST_BROKER_URL = "tcp://localhost:61716"; public static final String ADMIN = "admin"; public static final String BLOB_FILESERVER = "?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8761/fileserver/"; }
- Line 20: AMQ
BlobMessage
requires a file server
4.4. File Manager
Create a file manager to read and write a file via bytes array.
FileAsByteArrayManager.java
package jcg.demo.file; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; public class FileAsByteArrayManager { public byte[] readfileAsBytes(File file) throws IOException { try (RandomAccessFile accessFile = new RandomAccessFile(file, "r")) { byte[] bytes = new byte[(int) accessFile.length()]; accessFile.readFully(bytes); return bytes; } } public void writeFile(byte[] bytes, String fileName) throws IOException { File file = new File(fileName); try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) { accessFile.write(bytes); } } }
4.5. Save File Application
Create a Java application which receives the messages and save them to c:\temp\output\
with the same file name as before.
ConsumeFileApp.java
package jcg.demo; import javax.jms.JMSException; import jcg.demo.activemq.QueueMessageConsumer; public class ConsumeFileApp { public static void main(String[] args) { QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(Constants.TEST_BROKER_URL, Constants.ADMIN, Constants.ADMIN); queueMsgListener.setDestinationName(Constants.TEST_QUEUE); try { queueMsgListener.run(); } catch (JMSException e) { e.printStackTrace(); } } }
4.6. Message Consumer
Create the class QueueMessageConsumer
.
QueueMessageConsumer.java
package jcg.demo.activemq; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.Instant; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.io.IOUtils; import jcg.demo.Constants; import jcg.demo.file.FileAsByteArrayManager; /** * A message consumer which consumes the message from ActiveMQ Broker * * @author Mary.Zheng * */ public class QueueMessageConsumer implements MessageListener { private String activeMqBrokerUri; private String username; private String password; private String destinationName; private FileAsByteArrayManager fileManager = new FileAsByteArrayManager(); public QueueMessageConsumer(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); System.out.println(String.format("QueueMessageConsumer Waiting for messages at queue='%s' broker='%s'", destinationName, this.activeMqBrokerUri)); } @Override public void onMessage(Message message) { try { String filename = message.getStringProperty(Constants.FILE_NAME); Instant start = Instant.now(); if (message instanceof ActiveMQTextMessage) { handleTextMessage((ActiveMQTextMessage) message); } else if (message instanceof ActiveMQBlobMessage) { handleBlobMessage((ActiveMQBlobMessage) message, filename); } else if (message instanceof ActiveMQBytesMessage) { handleBytesMessage((ActiveMQBytesMessage) message, filename); } else { System.out.println("test"); } Instant end = Instant.now(); System.out .println("Consumed message with filename [" + filename + "], took " + Duration.between(start, end)); } catch (Exception e) { e.printStackTrace(); } } private void handleBytesMessage(ActiveMQBytesMessage bytesMessage, String filename) throws IOException, JMSException { String outputfileName = Constants.FILE_OUTPUT_BYTE_DIRECTORY + filename; fileManager.writeFile(bytesMessage.getContent().getData(), outputfileName); System.out.println("Received ActiveMQBytesMessage message"); } private void handleBlobMessage(ActiveMQBlobMessage blobMessage, String filename) throws FileNotFoundException, IOException, JMSException { // for 1mb or bigger message String outputfileName = Constants.FILE_OUTPUT_BLOB_DIRECTORY + filename; InputStream in = blobMessage.getInputStream(); fileManager.writeFile(IOUtils.toByteArray(in), outputfileName); System.out.println("Received ActiveMQBlobMessage message"); } private void handleTextMessage(ActiveMQTextMessage txtMessage) throws JMSException { String msg = String.format("Received ActiveMQTextMessage [ %s ]", txtMessage.getText()); System.out.println(msg); } public String getDestinationName() { return destinationName; } public void setDestinationName(String destinationName) { this.destinationName = destinationName; } }
- Line 73: Receives the file content as
BlobMessage
- Line 75: Receives the file content as
BytesMessage
4.7. Send File Application
Create a Java application to send nine files at C:\temp\input
as either ByteMessage
or BlobMessage
to AMQ.
SendFileApp.java
package jcg.demo; import java.util.Scanner; import jcg.demo.activemq.QueueMessageProducer; public class SendFileApp { public static void main(String[] args) { try { QueueMessageProducer queProducer = new QueueMessageProducer(Constants.TEST_BROKER_URL, Constants.ADMIN, Constants.ADMIN); System.out.println("Enter message type for transferring file:" + "\n\t1 - File as BytesMessage \n\t2 - File as BlobMessage"); try (Scanner scanIn = new Scanner(System.in)) { String inputFileType = scanIn.nextLine(); switch (inputFileType) { case "1": queProducer.sendBytesMessages(Constants.TEST_QUEUE); break; case "2": queProducer.sendBlobMessages(Constants.TEST_QUEUE); break; default: System.out.println("Wrong input"); } } } catch (Exception e) { e.printStackTrace(); } } }
4.8. Message Producer
Create a class QueueMessageProducer
.
QueueMessageProducer.java
package jcg.demo.activemq; import java.io.File; import java.io.IOException; import java.time.Duration; import java.time.Instant; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.StreamMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import jcg.demo.Constants; import jcg.demo.file.FileAsByteArrayManager; /** * A message producer which sends the file message to ActiveMQ Broker * * @author Mary.Zheng * */ public class QueueMessageProducer { private String activeMqBrokerUri; private String username; private String password; private ActiveMQSession session; private MessageProducer msgProducer; private ConnectionFactory connFactory; private Connection connection; private FileAsByteArrayManager fileManager = new FileAsByteArrayManager(); public QueueMessageProducer(String activeMqBrokerUri, String username, String password) { super(); this.activeMqBrokerUri = activeMqBrokerUri; this.username = username; this.password = password; } private void setup() throws JMSException { connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri); connection = connFactory.createConnection(); connection.start(); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } private void close() { try { if (msgProducer != null) { msgProducer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Throwable ignore) { } } public void sendBytesMessages(String queueName) throws JMSException, IOException { setup(); msgProducer = session.createProducer(session.createQueue(queueName)); File[] files = new File(Constants.FILE_INPUT_DIRECTORY).listFiles(); for (File file : files) { if (file.isFile()) { sendFileAsBytesMessage(file); } } close(); } public void sendBlobMessages(String queueName) throws JMSException { this.activeMqBrokerUri = activeMqBrokerUri + Constants.BLOB_FILESERVER; setup(); msgProducer = session.createProducer(session.createQueue(queueName)); File[] files = new File(Constants.FILE_INPUT_DIRECTORY).listFiles(); for (File file : files) { if (file.isFile()) { sendFileAsBlobMessage(file); } } close(); } private void sendFileAsBlobMessage(File file) throws JMSException { Instant start = Instant.now(); BlobMessage blobMessage = session.createBlobMessage(file); blobMessage.setStringProperty(Constants.FILE_NAME, file.getName()); msgProducer.send(blobMessage); Instant end = Instant.now(); System.out.println("sendFileAsBlobMessage for [" + file.getName() + "], took " + Duration.between(start, end)); } private void sendFileAsBytesMessage(File file) throws JMSException, IOException { Instant start = Instant.now(); BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.setStringProperty(Constants.FILE_NAME, file.getName()); bytesMessage.writeBytes(fileManager.readfileAsBytes(file)); msgProducer.send(bytesMessage); Instant end = Instant.now(); System.out.println("sendFileAsBytesMessage for [" + file.getName() + "], took " + Duration.between(start, end)); } }
- Line 89: AMQ
BlobMessage
requires a file server
5. Demo Time
5.1. Start ConsumeFileApp
Start the ConsumeFileApp
.
ConsumeFileApp Output
QueueMessageConsumer Waiting for messages at queue='test.queue' broker='tcp://localhost:61716'
5.2. Start SendFileApp
Run SendFileApp
with BytesMessage
Send BytesMessage Output
Enter message type for transferring file: 1 - File as BytesMessage 2 - File as BlobMessage 1 sendFileAsBytesMessage for [10-18 Year Swim Lessons- SUMMER.docx], took PT0.02S sendFileAsBytesMessage for [2017_18 _Schedule_chess.pdf], took PT0.009S sendFileAsBytesMessage for [activemq-monitor-demo.zip], took PT0.008S sendFileAsBytesMessage for [activeMQ.vsd], took PT0.01S sendFileAsBytesMessage for [JVM_memory.PNG], took PT0.008S sendFileAsBytesMessage for [site_cag.txt], took PT0.006S sendFileAsBytesMessage for [test.xlsx], took PT0.009S sendFileAsBytesMessage for [test2.ppt], took PT0.008S sendFileAsBytesMessage for [Tulips.jpg], took PT0.018S
The ConsumeFileApp
output:
ConsumeFileApp Output
QueueMessageConsumer Waiting for messages at queue='test.queue' broker='tcp://localhost:61716' Received ActiveMQBytesMessage message Consumed message with filename [10-18 Year Swim Lessons- SUMMER.docx], took PT0.002S Received ActiveMQBytesMessage message Consumed message with filename [2017_18 _Schedule_chess.pdf], took PT0.002S Received ActiveMQBytesMessage message Consumed message with filename [activemq-monitor-demo.zip], took PT0.001S Received ActiveMQBytesMessage message Consumed message with filename [activeMQ.vsd], took PT0.001S Received ActiveMQBytesMessage message Consumed message with filename [JVM_memory.PNG], took PT0.002S Received ActiveMQBytesMessage message Consumed message with filename [site_cag.txt], took PT0.001S Received ActiveMQBytesMessage message Consumed message with filename [test.xlsx], took PT0.001S Received ActiveMQBytesMessage message Consumed message with filename [test2.ppt], took PT0.001S Received ActiveMQBytesMessage message Consumed message with filename [Tulips.jpg], took PT0.004S
Run SendFileApp
for BlobMessage
Enter message type for transferring file: 1 - File as BytesMessage 2 - File as BlobMessage 2 sendFileAsBlobMessage for [10-18 Year Swim Lessons- SUMMER.docx], took PT0.048S sendFileAsBlobMessage for [2017_18 _Schedule_chess.pdf], took PT0.021S sendFileAsBlobMessage for [activemq-monitor-demo.zip], took PT0.01S sendFileAsBlobMessage for [activeMQ.vsd], took PT0.02S sendFileAsBlobMessage for [JVM_memory.PNG], took PT0.012S sendFileAsBlobMessage for [site_cag.txt], took PT0.011S sendFileAsBlobMessage for [test.xlsx], took PT0.015S sendFileAsBlobMessage for [test2.ppt], took PT0.012S sendFileAsBlobMessage for [Tulips.jpg], took PT0.029S
- Line 2:
BlobMessage
took longer (28 ms) thanBytesMessage
to send10-18 Year Swim Lessons- SUMMER.docx
The ConsumeFileApp
output:
ConsumeFileApp Output
Received ActiveMQBlobMessage message Consumed message with filename [10-18 Year Swim Lessons- SUMMER.docx], took PT0.044S Received ActiveMQBlobMessage message Consumed message with filename [2017_18 _Schedule_chess.pdf], took PT0.011S Received ActiveMQBlobMessage message Consumed message with filename [activemq-monitor-demo.zip], took PT0.007S Received ActiveMQBlobMessage message Consumed message with filename [activeMQ.vsd], took PT0.01S Received ActiveMQBlobMessage message Consumed message with filename [JVM_memory.PNG], took PT0.006S Received ActiveMQBlobMessage message Consumed message with filename [site_cag.txt], took PT0.005S Received ActiveMQBlobMessage message Consumed message with filename [test.xlsx], took PT0.006S Received ActiveMQBlobMessage message Consumed message with filename [test2.ppt], took PT0.005S Received ActiveMQBlobMessage message Consumed message with filename [Tulips.jpg], took PT0.021S
- Line 2:
BlobMessage
took longer (42 ms) thanBytesMessage
to save the10-18 Year Swim Lessons- SUMMER.docx
5.3. Verify the Transferred Files
Check the files at the output directory. The files at C:\temp\blob\
and C:\temp\bytes\
are the same as the ones from the C:\temp\input
directory.
The image below shows the saved files at blob directory. Click to open and compare to the input files.
6. Summary
In this example, I built two Java AMQ client applications:
SendFileApp
sends the file to the AMQ viaByteMessage
andBlobMessage
ConsumeFileApp
receives the message from the AMQ and saves it to a different location
I compared the total time to send and receive two different message types and found out that BytesMessage
is faster than BlobMessage
.
7. Download the Source Code
This example consists of two applications to send and receive file data based on the BytesMessage
and BlobMessage
.
You can download the full source code of this example here: Apache ActiveMQ File Transfer Example
Hi I tried to run your demo and the blob send works fine except for the amount of heap memory it requires, I’m having trouble with the byte send it has given me a bunch of errors, would you mind helping out?
This is really very helpful.
Could you please post an article with configuring external file server (HTTP or FTP) to send Blob message.
Hi, is it possible to achieve this as text. What I can upload files but i am unable to get the file names of the text I read from the files. How can I achieve this . Can you help?