AWS SQS Message Attributes Example
In this article, we will show an example on AWS SQS Message Attributes.
1. Introduction
Amazon Simple Queue Service (SQS) is a fully managed message queuing service in the AWS cloud. Using SQS, We can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. In addition to the message itself, we can include structured metadata (such as timestamps, geospatial data, signatures, and identifiers) with messages using message attributes. In this example, we are going to show you how to add message attributes when sending a message and how to extract message attributes when consuming a message in Java.
2. Technologies Used
- AWS SQS: distributed message queue
- AWS SDK for Java 2.0: Java APIs for AWS services
- Log4j2: logging APIs
- Gradle 6: build tool
- OpenJDK 13: an open-source implementation of the Java Platform, Standard Edition
Before we start, please follow AWS SDK for Java 2.0 Developer Guide to set up AWS credentials and region for development.
3. Message Attributes Example
3.1 The Basics
The body of a message can include only XML, JSON, and unformatted text in SQS. In addition to the message body itself, message attributes are structured metadata which is optional and can be attached to and sent together with the message. There are two sets of message attributes: Message Attributes and Message System Attributes. Message Attributes are provided for general purpose use cases which normally are added and extracted by our applications. Each message can have up to 10 attributes. Message System Attributes are designed to store metadata for other AWS services, such as AWS X-Ray. In the following example, we will focus on the usage of Message Attributes.
One of the common use cases of Message Attributes is distributed tracing. When a messaging infrastructure such as SQS is used by distributed applications, tracing a message produced and consumed among applications becomes tricky yet an essential feature to have. To demonstrate how to add and extract custom metadata with message attributes, an attribute named traceId
will be used and there are three components in the example:
- A standard queue in SQS: a queue receives messages from producers and distributes messages to consumers.
- A producer: a user application that sends messages.
- A consumer: a user application that receives messages.
3.2 Creating a Standard Queue in SQS
SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A service client named SqsClient
is used to access Amazon SQS. We can create a new instance of SqsClient
as below:
SqsClient sqsClient = SqsClient.builder().region(Region.of("ap-southeast-2")).build();
Then invoke the createQueue
API provided by SqsClient
with an instance of CreateQueueRequest
to create a new standard queue as below:
CreateQueueRequest request = CreateQueueRequest.builder().queueName("MyQueue").build(); CreateQueueResponse response = sqsClient.createQueue(request);
Note that the queue name of the standard queue we specified here is “MyQueue” because there is a contract defined by AWS SQS that a FIFO queue name must end with the .fifo
suffix.
3.3 Adding a Message Attribute When Sending a Message
We use a producer to simulate a user application that sends messages to the SQS queue. The SqsClient
provides sendMessage API for us to send a message to the SQS queue. Firstly, we create a sample message myEvent and serialize it to JSON string:
// sample message MyEvent myEvent = new MyEvent(); myEvent.setId(UUID.randomUUID().toString()); myEvent.setSource(Thread.currentThread().getName()); myEvent.setPayload("AWS SQS message attributes example."); String message = null; try { message = objectMapper.writeValueAsString(myEvent); } catch (JsonProcessingException e) { logger.error(e); }
Then we generate a random UUID as the trace Id and add it as one of the message attributes attached to the message:
// generates a UUID as the traceId String traceId = UUID.randomUUID().toString(); final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); // add traceId as a message attribute messageAttributes.put(TRACE_ID_NAME, MessageAttributeValue.builder().dataType("String").stringValue(traceId).build());
Finally, we build an instance of SendMessageRequest
with the message attributes and call sendMessage
API to send the message:
SendMessageRequest.Builder builder = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(message) .messageAttributes(messageAttributes); // send the message logger.info("Sending message to queue {} with {}={}", this.queueName, TRACE_ID_NAME, traceId); this.sqsClient.sendMessage(builder.build());
3.4 Extracting a Message Attribute When Receiving a Message
We use a consumer to simulate a user application that consumes messages from the queue. The SqsClient
provides straight forward API receiveMessage
for us to receive messages from a queue with short polling or long polling. Long polling results in higher performance at a reduced cost in the majority of use cases. So it is recommended unless the application expects an immediate response from a receiveMessage call. A Runnable task is submitted to a single thread pool too long poll messages. More details can be found in the source code attached.
The following code snippet shows an example of using long-polling to receive messages with message attributes from a queue. If there is a message available in 20 seconds, the call will return with the message. Otherwise, the call will keep waiting until 20 seconds timed out and then returns an empty list. Note that the key here is to call messageAttributeNames
method by specifying the attribute name we would like to receive together with the message when building the ReceiveMessageRequest
. Please check out the Javadoc of messageAttributeNames
method for more details.
// long polling and wait for waitTimeSeconds before timed out ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .waitTimeSeconds(20) .messageAttributeNames("trace-id") // returns the trace Id .build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();
Then we can extract the message attribute trace-id
by using the static method below:
/** * Extract message attribute. * * @param message The message * @param attributeName The attribute name * @return The attribute value */ private static String extractAttribute(Message message, String attributeName) { if (message.hasMessageAttributes()) { Map<String, MessageAttributeValue> messageAttributes = message.messageAttributes(); MessageAttributeValue attributeValue = messageAttributes.get(attributeName); if (attributeValue != null) { return attributeValue.stringValue(); } } return null; }
You may ask why not adding message attributes as part of the message body directly. It is a valid point in case the message brokers you are using don’t support message metadata. But one obvious benefit of using message attributes instead of squeezing metadata into the message body is that consumers can use message attributes to handle a message in a particular way without having to process the message body first. In the code snippet below, we extract the message attribute we care about and handle it with our own logic before parsing the message body.
/** * Process message. * * @param message the message */ private void processMessage(Message message) { logger.info("Processing message {}", message.messageId()); // extract traceId String traceId = extractAttribute(message, TRACE_ID_NAME); // special handling before parsing the message body if (traceId == null || traceId.length() != 36) { logger.error("{} is compromised, message {} abandoned", TRACE_ID_NAME, message.messageId()); return; } // deserialise message body MyEvent myEvent = null; try { myEvent = objectMapper.readValue(message.body(), MyEvent.class); } catch (JsonProcessingException e) { logger.error(e); } logger.info("Message processed: {}={}, MyEvent={}", TRACE_ID_NAME, traceId, myEvent == null ? null : myEvent.toString()); }
3.5 Running the Example
For ease of dependencies management and build, we use Gradle 6.0 as the build tool. To run the example, you can run the messageAttributesExample
gradle task from command line as below:
gradlew messageAttributesExample
The output would be:
[23:11:23,233] [INFO] aws.demo.MessageAttributesExample:135 [main] - Create queue MyQueue in region ap-southeast-2 [23:11:24,098] [INFO] aws.demo.MessageAttributesExample:139 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue [23:11:24,227] [INFO] aws.demo.MessageAttributesExample$MyProducer:276 [pool-3-thread-1] - Sending message to queue MyQueue with trace-id=a447a294-1473-4725-83b8-64e1cd9db308 [23:11:24,366] [INFO] aws.demo.MessageAttributesExample$MyConsumer:334 [pool-4-thread-1] - Receiving messages from MyQueue... [23:11:24,482] [INFO] aws.demo.MessageAttributesExample$MyConsumer:345 [pool-4-thread-1] - 1 messages received. [23:11:24,482] [INFO] aws.demo.MessageAttributesExample$MyConsumer:360 [pool-4-thread-1] - Processing message 9cfcde98-841d-415e-b795-7c9396c7a8ea [23:11:24,520] [INFO] aws.demo.MessageAttributesExample$MyConsumer:379 [pool-4-thread-1] - Message processed: trace-id=a447a294-1473-4725-83b8-64e1cd9db308, MyEvent=MyEvent[id='434d8016-b06c-4424-8ab1-1140f6fc87b3', timeStamp=2020-08-25T13:11:24.101739200Z, source='pool-3-thread-1', payload='AWS SQS message attributes example.'] [23:11:24,549] [INFO] aws.demo.MessageAttributesExample$MyConsumer:390 [pool-4-thread-1] - Deleting message 9cfcde98-841d-415e-b795-7c9396c7a8ea from queue: MyQueue [23:11:24,595] [INFO] aws.demo.MessageAttributesExample:149 [main] - Delete queue MyQueue [23:11:24,703] [INFO] aws.demo.MessageAttributesExample:154 [main] - Queue MyQueue deleted.
We can see from the output above that the message with the trace-id
a447a294-1473-4725-83b8-64e1cd9db308
sent by the producer first was consumed by the consumer with the identical trace-id
a447a294-1473-4725-83b8-64e1cd9db308
extracted from the message attributes successfully.
4. Download the Example Source Code
You can download the full source code of this example here: AWS SQS Message Attributes Example