Amazon AWS

AWS SQS Message Attributes Example

In this article, we will show an example on AWS SQS Message Attributes.

1. Introduction

Amazon Simple Queue Service (SQS) is a fully managed message queuing service in the AWS cloud. Using SQS, We can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. In addition to the message itself, we can include structured metadata (such as timestamps, geospatial data, signatures, and identifiers) with messages using message attributes. In this example, we are going to show you how to add message attributes when sending a message and how to extract message attributes when consuming a message in Java.

2. Technologies Used

Before we start, please follow AWS SDK for Java 2.0 Developer Guide to set up AWS credentials and region for development.

3. Message Attributes Example

3.1 The Basics

The body of a message can include only XML, JSON, and unformatted text in SQS. In addition to the message body itself, message attributes are structured metadata which is optional and can be attached to and sent together with the message. There are two sets of message attributes: Message Attributes and Message System Attributes. Message Attributes are provided for general purpose use cases which normally are added and extracted by our applications. Each message can have up to 10 attributes. Message System Attributes are designed to store metadata for other AWS services, such as AWS X-Ray. In the following example, we will focus on the usage of Message Attributes.

One of the common use cases of Message Attributes is distributed tracing. When a messaging infrastructure such as SQS is used by distributed applications, tracing a message produced and consumed among applications becomes tricky yet an essential feature to have. To demonstrate how to add and extract custom metadata with message attributes, an attribute named traceId will be used and there are three components in the example:

  • A standard queue in SQS: a queue receives messages from producers and distributes messages to consumers.
  • A producer: a user application that sends messages.
  • A consumer: a user application that receives messages.

3.2 Creating a Standard Queue in SQS

SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A service client named SqsClient is used to access Amazon SQS. We can create a new instance of SqsClient as below:

SqsClient sqsClient = SqsClient.builder().region(Region.of("ap-southeast-2")).build();

Then invoke the createQueue API provided by SqsClient with an instance of CreateQueueRequest to create a new standard queue as below:

CreateQueueRequest request = CreateQueueRequest.builder().queueName("MyQueue").build();
CreateQueueResponse response = sqsClient.createQueue(request);

Note that the queue name of the standard queue we specified here is “MyQueue” because there is a contract defined by AWS SQS that a FIFO queue name must end with the .fifo suffix.

3.3 Adding a Message Attribute When Sending a Message

We use a producer to simulate a user application that sends messages to the SQS queue. The SqsClient provides sendMessage API for us to send a message to the SQS queue. Firstly, we create a sample message myEvent and serialize it to JSON string:

// sample message
MyEvent myEvent = new MyEvent();
myEvent.setId(UUID.randomUUID().toString());
myEvent.setSource(Thread.currentThread().getName());
myEvent.setPayload("AWS SQS message attributes example.");

String message = null;
try {
    message = objectMapper.writeValueAsString(myEvent);
} catch (JsonProcessingException e) {
    logger.error(e);
}

Then we generate a random UUID as the trace Id and add it as one of the message attributes attached to the message:

// generates a UUID as the traceId
String traceId = UUID.randomUUID().toString();
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
// add traceId as a message attribute
messageAttributes.put(TRACE_ID_NAME, MessageAttributeValue.builder().dataType("String").stringValue(traceId).build());

Finally, we build an instance of SendMessageRequest with the message attributes and call sendMessage API to send the message:

SendMessageRequest.Builder builder = SendMessageRequest.builder()
        .queueUrl(queueUrl)
        .messageBody(message)
        .messageAttributes(messageAttributes);
// send the message
logger.info("Sending message to queue {} with {}={}", this.queueName, TRACE_ID_NAME, traceId);
this.sqsClient.sendMessage(builder.build());

3.4 Extracting a Message Attribute When Receiving a Message

We use a consumer to simulate a user application that consumes messages from the queue. The SqsClient provides straight forward API receiveMessage for us to receive messages from a queue with short polling or long polling. Long polling results in higher performance at a reduced cost in the majority of use cases. So it is recommended unless the application expects an immediate response from a receiveMessage call. A Runnable task is submitted to a single thread pool too long poll messages. More details can be found in the source code attached.

The following code snippet shows an example of using long-polling to receive messages with message attributes from a queue. If there is a message available in 20 seconds, the call will return with the message. Otherwise, the call will keep waiting until 20 seconds timed out and then returns an empty list. Note that the key here is to call messageAttributeNames method by specifying the attribute name we would like to receive together with the message when building the ReceiveMessageRequest. Please check out the Javadoc of messageAttributeNames method for more details.

// long polling and wait for waitTimeSeconds before timed out
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .waitTimeSeconds(20)
        .messageAttributeNames("trace-id") // returns the trace Id
        .build();
List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();

Then we can extract the message attribute trace-id by using the static method below:

/**
 * Extract message attribute.
 *
 * @param message       The message
 * @param attributeName The attribute name
 * @return The attribute value
 */
private static String extractAttribute(Message message, String attributeName) {
    if (message.hasMessageAttributes()) {
        Map<String, MessageAttributeValue> messageAttributes = message.messageAttributes();
        MessageAttributeValue attributeValue = messageAttributes.get(attributeName);
        if (attributeValue != null) {
            return attributeValue.stringValue();
        }
    }
    return null;
}

You may ask why not adding message attributes as part of the message body directly. It is a valid point in case the message brokers you are using don’t support message metadata. But one obvious benefit of using message attributes instead of squeezing metadata into the message body is that consumers can use message attributes to handle a message in a particular way without having to process the message body first. In the code snippet below, we extract the message attribute we care about and handle it with our own logic before parsing the message body.

/**
 * Process message.
 * 
 * @param message the message
 */
private void processMessage(Message message) {
    logger.info("Processing message {}", message.messageId());

    // extract traceId
    String traceId = extractAttribute(message, TRACE_ID_NAME);

    // special handling before parsing the message body
    if (traceId == null || traceId.length() != 36) {
        logger.error("{} is compromised, message {} abandoned", TRACE_ID_NAME, message.messageId());
        return;
    }

    // deserialise message body
    MyEvent myEvent = null;
    try {
        myEvent = objectMapper.readValue(message.body(), MyEvent.class);
    } catch (JsonProcessingException e) {
        logger.error(e);
    }

    logger.info("Message processed: {}={}, MyEvent={}", TRACE_ID_NAME, traceId, myEvent == null ? null : myEvent.toString());
}

3.5 Running the Example

For ease of dependencies management and build, we use Gradle 6.0 as the build tool. To run the example, you can run the messageAttributesExample gradle task from command line as below:

gradlew messageAttributesExample

The output would be:

[23:11:23,233] [INFO] aws.demo.MessageAttributesExample:135 [main] - Create queue MyQueue in region ap-southeast-2
[23:11:24,098] [INFO] aws.demo.MessageAttributesExample:139 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue
[23:11:24,227] [INFO] aws.demo.MessageAttributesExample$MyProducer:276 [pool-3-thread-1] - Sending message to queue MyQueue with trace-id=a447a294-1473-4725-83b8-64e1cd9db308
[23:11:24,366] [INFO] aws.demo.MessageAttributesExample$MyConsumer:334 [pool-4-thread-1] - Receiving messages from MyQueue...
[23:11:24,482] [INFO] aws.demo.MessageAttributesExample$MyConsumer:345 [pool-4-thread-1] - 1 messages received.
[23:11:24,482] [INFO] aws.demo.MessageAttributesExample$MyConsumer:360 [pool-4-thread-1] - Processing message 9cfcde98-841d-415e-b795-7c9396c7a8ea
[23:11:24,520] [INFO] aws.demo.MessageAttributesExample$MyConsumer:379 [pool-4-thread-1] - Message processed: trace-id=a447a294-1473-4725-83b8-64e1cd9db308, MyEvent=MyEvent[id='434d8016-b06c-4424-8ab1-1140f6fc87b3', timeStamp=2020-08-25T13:11:24.101739200Z, source='pool-3-thread-1', payload='AWS SQS message attributes example.']
[23:11:24,549] [INFO] aws.demo.MessageAttributesExample$MyConsumer:390 [pool-4-thread-1] - Deleting message 9cfcde98-841d-415e-b795-7c9396c7a8ea from queue: MyQueue
[23:11:24,595] [INFO] aws.demo.MessageAttributesExample:149 [main] - Delete queue MyQueue
[23:11:24,703] [INFO] aws.demo.MessageAttributesExample:154 [main] - Queue MyQueue deleted.

We can see from the output above that the message with the trace-id a447a294-1473-4725-83b8-64e1cd9db308 sent by the producer first was consumed by the consumer with the identical trace-id a447a294-1473-4725-83b8-64e1cd9db308 extracted from the message attributes successfully.

4. Download the Example Source Code

Download
You can download the full source code of this example here: AWS SQS Message Attributes Example

Kevin Yang

A software design and development professional with seventeen years’ experience in the IT industry, especially with Java EE and .NET, I have worked for software companies, scientific research institutes and websites.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button