Amazon AWSSoftware Development

AWS SQS FIFO Queue Example in Java

In this article, we wil show a AWS SQS FIFO Queue Example in Java.

1. Introduction

Amazon Simple Queue Service (SQS) is a fully managed message queuing service in the AWS cloud. Using SQS, We can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent. In this example, we are going to show you how to create an SQS FIFO queue and how to use features provided by FIFO queues such as message deduplication and message groups in Java.

2. Technologies Used

Before we start, please follow AWS SDK for Java 2.0 Developer Guide to set up AWS credentials and region for development.

3. FIFO Queue Example

3.1 The Basics

SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. In addition to the features supported by standard queues, FIFO queues support ordering and exactly-once processing. They also provide additional features that help prevent unintentional duplicates from being sent by message producers or from being received by message consumers. Additionally, message groups allow multiple separate ordered message streams within the same queue.

3.2 Creating a FIFO Queue in SQS

To create a FIFO queue in SQS, a service client named SqsClient is used. We can create a new instance of SqsClient as below:

SqsClient sqsClient = SqsClient.builder().region(Region.of("ap-southeast-2")).build();

Then invoke the createQueue API provided by SqsClient with an instance of CreateQueueRequest to create a new standard queue as below:

Map<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.FIFO_QUEUE, Boolean.TRUE.toString());
CreateQueueRequest request = CreateQueueRequest.builder()
        .queueName("MyQueue.fifo")
        .attributes(attributes) // Set FIFO_QUEUE attribute to true
        .build();
this.sqsClient.createQueue(request);

Note that the queue name of the FIFO queue we specified here is “MyQueue.fifo” because there is a contract defined by SqsClient that a FIFO queue name must end with the .fifo suffix. Also, we need to set QueueAttributeName.FIFO_QUEUE attribute to true in the attributes of the CreateQueueRequest.

3.3 Sending Groups of Messages With Deduplication Enabled

We use two producers to simulate two user applications that send two groups of messages to the SQS FIFO queue. The SqsClient provides sendMessage API for us to send a message to the SQS FIFO queue. Each producer sends a group of messages with a message group Id. Messages that belong to the same message group are processed in a FIFO manner. Each message is sent with a unique deduplication Id within the same message group. If a message with a particular deduplication Id is sent successfully, any messages sent with the same deduplication Id are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

For demonstration purpose, we create a sample message myEvent with a client side sequence number and serialise it to JSON string:

MyEvent myEvent = new MyEvent();
myEvent.setId(UUID.randomUUID().toString());
myEvent.setSource(Thread.currentThread().getName());
myEvent.setClientSideSequenceNumber(1);
myEvent.setPayload("AWS SQS FIFO queue message group example.");

String message = null;
try {
    message = objectMapper.writeValueAsString(myEvent);
} catch (JsonProcessingException e) {
    logger.error(e);
}

Then we generate a custom deduplication Id by using the message group Id and the message’s sequence number:

// custom dedup id: groupId + ":" + seqNum
String deduplicationId = "example-group-1" + ":" + 1;

Finally, we build an instance of SendMessageRequest with the message attributes and call sendMessage API to send the message:

SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
        .queueUrl(queueUrl)
        .messageBody(message)
        .messageDeduplicationId("example-group-1:1") // deduplication Id
        .messageGroupId("example-group-1") // message group Id
        .build();
// send the message
logger.info("Sending message #{} of {} to queue {}", i, this.numberOfEvents, this.queueName);
this.sqsClient.sendMessage(sendMessageRequest);

Note that to verify the message deduplication feature, we deliberately send a duplicate of the last message in the message group as below:

// deliberately send a duplicate of the last message
if (i == this.numberOfEvents) {
    logger.info("Sending a duplicate message #{} to queue {}", i, this.queueName);
    this.sqsClient.sendMessage(sendMessageRequest);
}

3.4 Consuming Groups of Messages

We use two consumers to simulate two user applications that consume messages from the queue. The SqsClient provides straight forward API receiveMessage for us to receive messages from a queue with short polling or long polling. Please refer to AWS SQS Polling Example in Java for more details of receiving messages by polling.

The following code snippet shows an example of using long-polling to receive messages from a queue. If there is a message available in 20 seconds, the call will return with the message. Otherwise, the call will keep waiting until 20 seconds timed out and then returns an empty list.

// long polling and wait for waitTimeSeconds before timed out
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl)
        .waitTimeSeconds(LONG_POLLING_SECONDS) // force long polling
        .attributeNamesWithStrings("All") // return all attributes
        .build();
List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();

Note that we set attributeNamesWithStrings to All when building the ReceiveMessageRequest because we would like to get system attributes such as MessageGroupId, MessageDeduplicationId, SequenceNumber and sentTimestamp for demonstration purposes when processing messages later on. For more details about message attributes, please refer to AWS SQS Message Attributes Example. Also, note that receiveMessage might return messages with multiple MessageGroupId values. For each MessageGroupId, the messages are sorted by time sent. When messages that belong to a particular message group ID are received by one consumer, the whole message group is invisible to other consumers which means no other consumer can process messages with the same message group ID.

Then we can process the message and log the details to see if the messages within a group are handled in order:

/**
 * Process message.
 * 
 * @param message the message
 */
private void processMessage(Message message) {
    logger.info("Processing message {}", message.messageId());

    // extract system attributes: groupId, deduplicationId, sequenceNumber, sentTimestamp
    String groupId = extractAttribute(message, MessageSystemAttributeName.MESSAGE_GROUP_ID.toString());
    String deduplicationId = extractAttribute(message, MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID.toString());
    String sequenceNumber = extractAttribute(message, MessageSystemAttributeName.SEQUENCE_NUMBER.toString());
    String sentTimestamp = extractAttribute(message, MessageSystemAttributeName.SENT_TIMESTAMP.toString());

    // deserialise message body
    MyEvent myEvent = null;
    try {
        myEvent = objectMapper.readValue(message.body(), MyEvent.class);
    } catch (JsonProcessingException e) {
        logger.error(e);
    }

    if (myEvent != null) {
        logger.info("Message processed: groupId={}, dedupId={}, seqNum={}, ts={}, cSeqNum={}, MyEvent={}", 
                groupId, deduplicationId, sequenceNumber, sentTimestamp, myEvent.getClientSideSequenceNumber(), myEvent.toString());
    }
}

3.5 Running the Example

For ease of dependencies management and build, we use Gradle 6.0 as the build tool. To run the example, you can run the fifoQueueExample gradle task from the command line as below:

gradlew fifoQueueExample

The output would be:

[00:03:15,124] [INFO] aws.demo.FifoQueueExample:146 [main] - Create FIFO queue MyQueue.fifo in region ap-southeast-2
[00:03:15,912] [INFO] aws.demo.FifoQueueExample:155 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue.fifo
[00:03:16,049] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #1 of 3 to queue MyQueue.fifo
[00:03:16,144] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #2 of 3 to queue MyQueue.fifo
[00:03:16,146] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-2] - Sending message #1 of 2 to queue MyQueue.fifo
[00:03:16,213] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #3 of 3 to queue MyQueue.fifo
[00:03:16,225] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-2] - Sending message #2 of 2 to queue MyQueue.fifo
[00:03:16,253] [INFO] aws.demo.FifoQueueExample$MyProducer:312 [pool-3-thread-1] - Sending a duplicate message #3 in example-group-1 to queue MyQueue.fifo
[00:03:16,266] [INFO] aws.demo.FifoQueueExample$MyProducer:312 [pool-3-thread-2] - Sending a duplicate message #2 in example-group-2 to queue MyQueue.fifo
[00:03:16,304] [INFO] aws.demo.FifoQueueExample$MyConsumer:376 [pool-4-thread-2] - Receiving messages from MyQueue.fifo...
[00:03:16,304] [INFO] aws.demo.FifoQueueExample$MyConsumer:376 [pool-4-thread-1] - Receiving messages from MyQueue.fifo...
[00:03:16,400] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received.
[00:03:16,402] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 1 messages received.
[00:03:16,402] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 96e12eed-a9bc-473d-86f8-f77a4e341e9b
[00:03:16,403] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-2] - Processing message 6eb500d6-c70e-4219-9202-8ce51b71b4dc
[00:03:16,438] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:1, seqNum=18856345555320815616, ts=1600005787544, cSeqNum=1, MyEvent=MyEvent [id=1ba1f5df-222a-422e-bec4-c4f95951b060, timeStamp=2020-09-13T14:03:15.917794300Z, clientSideSequenceNumber=1, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.]
[00:03:16,438] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-2] - Message processed: groupId=example-group-2, dedupId=example-group-2:1, seqNum=18856345555344112384, ts=1600005787635, cSeqNum=1, MyEvent=MyEvent [id=64c1fe28-768e-4ef2-b3c7-807af34e4499, timeStamp=2020-09-13T14:03:15.917794300Z, clientSideSequenceNumber=1, source=pool-3-thread-2, payload=AWS SQS FIFO queue message group example.]
[00:03:16,468] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-2] - Deleting message 6eb500d6-c70e-4219-9202-8ce51b71b4dc from queue: MyQueue.fifo
[00:03:16,476] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 96e12eed-a9bc-473d-86f8-f77a4e341e9b from queue: MyQueue.fifo
[00:03:16,596] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received.
[00:03:16,597] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 9db843b2-eb14-44dc-bf99-9fab6012e49b
[00:03:16,598] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:2, seqNum=18856345555341551872, ts=1600005787625, cSeqNum=2, MyEvent=MyEvent [id=91feeb7f-d1da-43f3-ab87-70776cf92ea4, timeStamp=2020-09-13T14:03:16.113905500Z, clientSideSequenceNumber=2, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.]
[00:03:16,626] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 9db843b2-eb14-44dc-bf99-9fab6012e49b from queue: MyQueue.fifo
[00:03:16,646] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 1 messages received.
[00:03:16,647] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-2] - Processing message a7c5a783-284b-4830-8760-34c485ceb5bf
[00:03:16,648] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-2] - Message processed: groupId=example-group-2, dedupId=example-group-2:2, seqNum=18856345555361776128, ts=1600005787704, cSeqNum=2, MyEvent=MyEvent [id=f40e85ec-43cb-45bd-aa2a-f862585ae2ec, timeStamp=2020-09-13T14:03:16.198012300Z, clientSideSequenceNumber=2, source=pool-3-thread-2, payload=AWS SQS FIFO queue message group example.]
[00:03:16,674] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-2] - Deleting message a7c5a783-284b-4830-8760-34c485ceb5bf from queue: MyQueue.fifo
[00:03:16,738] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received.
[00:03:16,738] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 254870d8-6426-44fd-855e-72108223b4c3
[00:03:16,739] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:3, seqNum=18856345555358192384, ts=1600005787690, cSeqNum=3, MyEvent=MyEvent [id=ad384912-0b99-462c-981e-9beca19b8a95, timeStamp=2020-09-13T14:03:16.185902200Z, clientSideSequenceNumber=3, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.]
[00:03:16,766] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 254870d8-6426-44fd-855e-72108223b4c3 from queue: MyQueue.fifo
[00:03:36,804] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 0 messages received.
[00:03:36,804] [INFO] aws.demo.FifoQueueExample$MyConsumer:392 [pool-4-thread-2] - Consumer aborted after waiting for 20s with no message received.
[00:03:36,914] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 0 messages received.
[00:03:36,915] [INFO] aws.demo.FifoQueueExample$MyConsumer:392 [pool-4-thread-1] - Consumer aborted after waiting for 20s with no message received.
[00:03:36,916] [INFO] aws.demo.FifoQueueExample:165 [main] - Delete queue MyQueue.fifo
[00:03:37,036] [INFO] aws.demo.FifoQueueExample:170 [main] - Queue MyQueue.fifo deleted.

We can see from the output above that the messages sent in two groups have been received and processed in the order. In addition, the duplicated messages we sent on purpose have been deduplicated by the SQS FIFO queue successfully.

4. Download the Example Source Code

That was a AWS SQS FIFO Queue Example in Java.

Download
You can download the full source code of this example here: AWS SQS FIFO Queue Example in Java

Kevin Yang

A software design and development professional with seventeen years’ experience in the IT industry, especially with Java EE and .NET, I have worked for software companies, scientific research institutes and websites.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button