jms

Apache ActiveMQ File Transfer Example

1. Introduction

Apache ActiveMQ (AMQ) is a message broker which transfers messages from sender to receiver.

In this example, I will build two simple AMQ applications which will transfer files from one location to another:

  • A producer sends a file via BytesMessage or BlobMessage
  • A consumer receives the BytesMessage or BlobMessage and saves it as a file

 
 

2. JMS Message Type

JMS defines six different message types.

AMQ has deprecated SteamMessage and added BlobMessage.
Message types:

Message TypeContentsPurpose
TextMessageA java.lang.String objectExchanges simple text messages. such as XML and Json
MapMessageA set of name-value pairs, with names as String objects and values as primitive types in the Java programming language.Exchanges key-value data
ObjectMessageA Serializable object in the Java programming language.Exchanges Java objects.
StreamMessageA stream of primitive values in the Java programming language, filled and read sequentially.Deprecated within AMQ.
BytesMessageA stream of uninterpreted bytes. This message type is for literally encoding a body to match an existing message format.Exchanges data in a format that is native to the application, and when JMS is used as a transport between two systems, where the JMS client does not know the message payload type.
BlobMessageBinary Large Object (BLOB).Added by AMQ.

In this example, I will demonstrate how to transfer a file via BytesMessage and BlobMessage.

3. Business Use Case

Businesses exchange information by transferring files from one location to another.
In this example, it transfers nine files from C:\temp\input to C:\temp\output. These nine files belong to commonly used file types.

The image below shows file details as well as the output directory details.

Figure 1 Input files

4. File Transfer Application

4.1. Technologies Used

The example code in this article was built and run using:

  • Java 1.8.101 (1.8.x will do fine)
  • Maven 3.3.9 (3.3.x will do fine)
  • Apache ActiveMQ 5.15.0 (others will do fine)
  • Eclipse Neon (Any Java IDE would work)

4.2. Dependency

Add dependency to Maven pom.xml.

pom.xml

<dependencies>
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.15.0</version>
	</dependency>

	<dependency>
		<groupId>commons-io</groupId>
		<artifactId>commons-io</artifactId>
		<version>2.5</version>
	</dependency>

</dependencies>

4.3. Constants

There are eight constants value used in this example.

Constants.java

package jcg.demo;

/**
 * The constants for this demo.
 * 
 * @author Mary.Zheng
 *
 */
public class Constants {
	public static final String FILE_INPUT_DIRECTORY = "C:\\temp\\input";
	public static final String FILE_NAME = "fileName";

	public static final String FILE_OUTPUT_BYTE_DIRECTORY = "C:\\temp\\output\\bytes\\";
	public static final String FILE_OUTPUT_BLOB_DIRECTORY = "C:\\temp\\output\\blob\\";

	public static final String TEST_QUEUE = "test.queue";
	public static final String TEST_BROKER_URL = "tcp://localhost:61716";
	public static final String ADMIN = "admin";
	
	public static final String BLOB_FILESERVER = "?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8761/fileserver/";

}
  • Line 20: AMQ BlobMessage requires a file server

4.4. File Manager

Create a file manager to read and write a file via bytes array.

FileAsByteArrayManager.java

package jcg.demo.file;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileAsByteArrayManager {

	public byte[] readfileAsBytes(File file) throws IOException {	 
		try (RandomAccessFile accessFile = new RandomAccessFile(file, "r")) {
			byte[] bytes = new byte[(int) accessFile.length()];
			accessFile.readFully(bytes);
			return bytes;
		}
	}

	public void writeFile(byte[] bytes, String fileName) throws IOException {
		File file = new File(fileName);
		try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) {
			accessFile.write(bytes);
		}
	}
}

4.5. Save File Application

Create a Java application which receives the messages and save them to c:\temp\output\ with the same file name as before.

ConsumeFileApp.java

package jcg.demo;

import javax.jms.JMSException;

import jcg.demo.activemq.QueueMessageConsumer;

public class ConsumeFileApp {

	public static void main(String[] args) {

		QueueMessageConsumer queueMsgListener = new QueueMessageConsumer(Constants.TEST_BROKER_URL, Constants.ADMIN,
				Constants.ADMIN);
		queueMsgListener.setDestinationName(Constants.TEST_QUEUE);

		try {
			queueMsgListener.run();

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

4.6. Message Consumer

Create the class QueueMessageConsumer.

QueueMessageConsumer.java

package jcg.demo.activemq;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.io.IOUtils;

import jcg.demo.Constants;
import jcg.demo.file.FileAsByteArrayManager;

/**
 * A message consumer which consumes the message from ActiveMQ Broker
 * 
 * @author Mary.Zheng
 *
 */
public class QueueMessageConsumer implements MessageListener {

	private String activeMqBrokerUri;
	private String username;
	private String password;
	private String destinationName;
	private FileAsByteArrayManager fileManager = new FileAsByteArrayManager();

	public QueueMessageConsumer(String activeMqBrokerUri, String username, String password) {
		super();
		this.activeMqBrokerUri = activeMqBrokerUri;
		this.username = username;
		this.password = password;
	}

	public void run() throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		Destination destination = session.createQueue(destinationName);

		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(this);

		System.out.println(String.format("QueueMessageConsumer Waiting for messages at queue='%s' broker='%s'",
				destinationName, this.activeMqBrokerUri));
	}

	@Override
	public void onMessage(Message message) {

		try {
			String filename = message.getStringProperty(Constants.FILE_NAME);

			Instant start = Instant.now();

			if (message instanceof ActiveMQTextMessage) {
				handleTextMessage((ActiveMQTextMessage) message);
			} else if (message instanceof ActiveMQBlobMessage) {
				handleBlobMessage((ActiveMQBlobMessage) message, filename);
			} else if (message instanceof ActiveMQBytesMessage) {
				handleBytesMessage((ActiveMQBytesMessage) message, filename);
			} else {
				System.out.println("test");
			}

			Instant end = Instant.now();
			System.out
					.println("Consumed message with filename [" + filename + "], took " + Duration.between(start, end));

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void handleBytesMessage(ActiveMQBytesMessage bytesMessage, String filename)
			throws IOException, JMSException {
		String outputfileName = Constants.FILE_OUTPUT_BYTE_DIRECTORY + filename;
		fileManager.writeFile(bytesMessage.getContent().getData(), outputfileName);
		System.out.println("Received ActiveMQBytesMessage message");
	}

	private void handleBlobMessage(ActiveMQBlobMessage blobMessage, String filename)
			throws FileNotFoundException, IOException, JMSException {
		// for 1mb or bigger message
		String outputfileName = Constants.FILE_OUTPUT_BLOB_DIRECTORY + filename;
		InputStream in = blobMessage.getInputStream();
		fileManager.writeFile(IOUtils.toByteArray(in), outputfileName);
		System.out.println("Received ActiveMQBlobMessage message");
	}

	private void handleTextMessage(ActiveMQTextMessage txtMessage) throws JMSException {
		String msg = String.format("Received ActiveMQTextMessage [ %s ]", txtMessage.getText());
		System.out.println(msg);
	}

	public String getDestinationName() {
		return destinationName;
	}

	public void setDestinationName(String destinationName) {
		this.destinationName = destinationName;
	}
}
  • Line 73: Receives the file content as BlobMessage
  • Line 75: Receives the file content as BytesMessage

4.7. Send File Application

Create a Java application to send nine files at C:\temp\input as either ByteMessage or BlobMessage to AMQ.

SendFileApp.java

package jcg.demo;

import java.util.Scanner;

import jcg.demo.activemq.QueueMessageProducer;

public class SendFileApp {

	public static void main(String[] args) {
		try {

			QueueMessageProducer queProducer = new QueueMessageProducer(Constants.TEST_BROKER_URL, Constants.ADMIN,
					Constants.ADMIN);

			System.out.println("Enter message type for transferring file:"
					+ "\n\t1 - File as BytesMessage \n\t2 - File as BlobMessage");
			try (Scanner scanIn = new Scanner(System.in)) {
				String inputFileType = scanIn.nextLine();
				switch (inputFileType) {
				case "1":
					queProducer.sendBytesMessages(Constants.TEST_QUEUE);
					break;
				case "2":
					queProducer.sendBlobMessages(Constants.TEST_QUEUE);
					break;
				default:
					System.out.println("Wrong input");
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

4.8. Message Producer

Create a class QueueMessageProducer.

QueueMessageProducer.java

package jcg.demo.activemq;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

import jcg.demo.Constants;
import jcg.demo.file.FileAsByteArrayManager;

/**
 * A message producer which sends the file message to ActiveMQ Broker
 * 
 * @author Mary.Zheng
 *
 */
public class QueueMessageProducer {

	private String activeMqBrokerUri;
	private String username;
	private String password;

	private ActiveMQSession session;
	private MessageProducer msgProducer;
	private ConnectionFactory connFactory;
	private Connection connection;

	private FileAsByteArrayManager fileManager = new FileAsByteArrayManager();

	public QueueMessageProducer(String activeMqBrokerUri, String username, String password) {
		super();
		this.activeMqBrokerUri = activeMqBrokerUri;
		this.username = username;
		this.password = password;
	}

	private void setup() throws JMSException {
		connFactory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
		connection = connFactory.createConnection();
		connection.start();
		session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	private void close() {
		try {
			if (msgProducer != null) {
				msgProducer.close();
			}
			if (session != null) {
				session.close();
			}
			if (connection != null) {
				connection.close();
			}
		} catch (Throwable ignore) {
		}
	}

	public void sendBytesMessages(String queueName) throws JMSException, IOException {

		setup();

		msgProducer = session.createProducer(session.createQueue(queueName));

		File[] files = new File(Constants.FILE_INPUT_DIRECTORY).listFiles();
		for (File file : files) {
			if (file.isFile()) {
				sendFileAsBytesMessage(file);
			}
		}

		close();
	}

	public void sendBlobMessages(String queueName) throws JMSException {

		this.activeMqBrokerUri = activeMqBrokerUri + Constants.BLOB_FILESERVER;
		setup();

		msgProducer = session.createProducer(session.createQueue(queueName));

		File[] files = new File(Constants.FILE_INPUT_DIRECTORY).listFiles();
		for (File file : files) {
			if (file.isFile()) {
				sendFileAsBlobMessage(file);
			}
		}

		close();
	}

	private void sendFileAsBlobMessage(File file) throws JMSException {
		Instant start = Instant.now();
		BlobMessage blobMessage = session.createBlobMessage(file);
		blobMessage.setStringProperty(Constants.FILE_NAME, file.getName());
		msgProducer.send(blobMessage);
		Instant end = Instant.now();
		System.out.println("sendFileAsBlobMessage for [" + file.getName() + "], took " + Duration.between(start, end));
	}

	private void sendFileAsBytesMessage(File file) throws JMSException, IOException {
		Instant start = Instant.now();
		BytesMessage bytesMessage = session.createBytesMessage();
		bytesMessage.setStringProperty(Constants.FILE_NAME, file.getName());
		bytesMessage.writeBytes(fileManager.readfileAsBytes(file));
		msgProducer.send(bytesMessage);
		Instant end = Instant.now();
		System.out.println("sendFileAsBytesMessage for [" + file.getName() + "], took " + Duration.between(start, end));
	}
}
  • Line 89: AMQ BlobMessage requires a file server

5. Demo Time

5.1. Start ConsumeFileApp

Start the ConsumeFileApp.

ConsumeFileApp Output

QueueMessageConsumer Waiting for messages at queue='test.queue' broker='tcp://localhost:61716'

5.2. Start SendFileApp

Run SendFileApp with BytesMessage

Send BytesMessage Output

Enter message type for transferring file:
	1 - File as BytesMessage 
	2 - File as BlobMessage
1
sendFileAsBytesMessage for [10-18 Year Swim Lessons- SUMMER.docx], took PT0.02S
sendFileAsBytesMessage for [2017_18 _Schedule_chess.pdf], took PT0.009S
sendFileAsBytesMessage for [activemq-monitor-demo.zip], took PT0.008S
sendFileAsBytesMessage for [activeMQ.vsd], took PT0.01S
sendFileAsBytesMessage for [JVM_memory.PNG], took PT0.008S
sendFileAsBytesMessage for [site_cag.txt], took PT0.006S
sendFileAsBytesMessage for [test.xlsx], took PT0.009S
sendFileAsBytesMessage for [test2.ppt], took PT0.008S
sendFileAsBytesMessage for [Tulips.jpg], took PT0.018S

The ConsumeFileApp output:

ConsumeFileApp Output

QueueMessageConsumer Waiting for messages at queue='test.queue' broker='tcp://localhost:61716'
Received ActiveMQBytesMessage message
Consumed message with filename [10-18 Year Swim Lessons- SUMMER.docx], took PT0.002S
Received ActiveMQBytesMessage message
Consumed message with filename [2017_18 _Schedule_chess.pdf], took PT0.002S
Received ActiveMQBytesMessage message
Consumed message with filename [activemq-monitor-demo.zip], took PT0.001S
Received ActiveMQBytesMessage message
Consumed message with filename [activeMQ.vsd], took PT0.001S
Received ActiveMQBytesMessage message
Consumed message with filename [JVM_memory.PNG], took PT0.002S
Received ActiveMQBytesMessage message
Consumed message with filename [site_cag.txt], took PT0.001S
Received ActiveMQBytesMessage message
Consumed message with filename [test.xlsx], took PT0.001S
Received ActiveMQBytesMessage message
Consumed message with filename [test2.ppt], took PT0.001S
Received ActiveMQBytesMessage message
Consumed message with filename [Tulips.jpg], took PT0.004S

Run SendFileApp for BlobMessage

Enter message type for transferring file:
	1 - File as BytesMessage 
	2 - File as BlobMessage
2
sendFileAsBlobMessage for [10-18 Year Swim Lessons- SUMMER.docx], took PT0.048S
sendFileAsBlobMessage for [2017_18 _Schedule_chess.pdf], took PT0.021S
sendFileAsBlobMessage for [activemq-monitor-demo.zip], took PT0.01S
sendFileAsBlobMessage for [activeMQ.vsd], took PT0.02S
sendFileAsBlobMessage for [JVM_memory.PNG], took PT0.012S
sendFileAsBlobMessage for [site_cag.txt], took PT0.011S
sendFileAsBlobMessage for [test.xlsx], took PT0.015S
sendFileAsBlobMessage for [test2.ppt], took PT0.012S
sendFileAsBlobMessage for [Tulips.jpg], took PT0.029S
  • Line 2: BlobMessage took longer (28 ms) than BytesMessage to send 10-18 Year Swim Lessons- SUMMER.docx

The ConsumeFileApp output:

ConsumeFileApp Output

Received ActiveMQBlobMessage message
Consumed message with filename [10-18 Year Swim Lessons- SUMMER.docx], took PT0.044S
Received ActiveMQBlobMessage message
Consumed message with filename [2017_18 _Schedule_chess.pdf], took PT0.011S
Received ActiveMQBlobMessage message
Consumed message with filename [activemq-monitor-demo.zip], took PT0.007S
Received ActiveMQBlobMessage message
Consumed message with filename [activeMQ.vsd], took PT0.01S
Received ActiveMQBlobMessage message
Consumed message with filename [JVM_memory.PNG], took PT0.006S
Received ActiveMQBlobMessage message
Consumed message with filename [site_cag.txt], took PT0.005S
Received ActiveMQBlobMessage message
Consumed message with filename [test.xlsx], took PT0.006S
Received ActiveMQBlobMessage message
Consumed message with filename [test2.ppt], took PT0.005S
Received ActiveMQBlobMessage message
Consumed message with filename [Tulips.jpg], took PT0.021S
  • Line 2: BlobMessage took longer (42 ms) than BytesMessage to save the 10-18 Year Swim Lessons- SUMMER.docx

5.3. Verify the Transferred Files

Check the files at the output directory. The files at C:\temp\blob\ and C:\temp\bytes\ are the same as the ones from the C:\temp\input directory.

The image below shows the saved files at blob directory. Click to open and compare to the input files.

Figure 2 Files transferred via BlobMessage

6. Summary

In this example, I built two Java AMQ client applications:

  • SendFileApp sends the file to the AMQ via ByteMessage and BlobMessage
  • ConsumeFileApp receives the message from the AMQ and saves it to a different location

I compared the total time to send and receive two different message types and found out that BytesMessage is faster than BlobMessage.

7. Download the Source Code

This example consists of two applications to send and receive file data based on the BytesMessage and BlobMessage.

Download
You can download the full source code of this example here: Apache ActiveMQ File Transfer Example

Mary Zheng

Mary has graduated from Mechanical Engineering department at ShangHai JiaoTong University. She also holds a Master degree in Computer Science from Webster University. During her studies she has been involved with a large number of projects ranging from programming and software engineering. She works as a senior Software Engineer in the telecommunications sector where she acts as a leader and works with others to design, implement, and monitor the software solution.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

3 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Mitch
Mitch
6 years ago

Hi I tried to run your demo and the blob send works fine except for the amount of heap memory it requires, I’m having trouble with the byte send it has given me a bunch of errors, would you mind helping out?

sridhar
sridhar
5 years ago

This is really very helpful.
Could you please post an article with configuring external file server (HTTP or FTP) to send Blob message.

Add
Add
9 months ago

Hi, is it possible to achieve this as text. What I can upload files but i am unable to get the file names of the text I read from the files. How can I achieve this . Can you help?

Back to top button