AWS SQS FIFO Queue Example in Java
In this article, we wil show a AWS SQS FIFO Queue Example in Java.
1. Introduction
Amazon Simple Queue Service (SQS) is a fully managed message queuing service in the AWS cloud. Using SQS, We can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent. In this example, we are going to show you how to create an SQS FIFO queue and how to use features provided by FIFO queues such as message deduplication and message groups in Java.
2. Technologies Used
- AWS SQS: distributed message queue
- AWS SDK for Java 2.0: Java APIs for AWS services
- Log4j2: logging APIs
- Gradle 6: build tool
- OpenJDK 13: an open-source implementation of the Java Platform, Standard Edition
Before we start, please follow AWS SDK for Java 2.0 Developer Guide to set up AWS credentials and region for development.
3. FIFO Queue Example
3.1 The Basics
SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. In addition to the features supported by standard queues, FIFO queues support ordering and exactly-once processing. They also provide additional features that help prevent unintentional duplicates from being sent by message producers or from being received by message consumers. Additionally, message groups allow multiple separate ordered message streams within the same queue.
3.2 Creating a FIFO Queue in SQS
To create a FIFO queue in SQS, a service client named SqsClient
is used. We can create a new instance of SqsClient
as below:
SqsClient sqsClient = SqsClient.builder().region(Region.of("ap-southeast-2")).build();
Then invoke the createQueue
API provided by SqsClient
with an instance of CreateQueueRequest
to create a new standard queue as below:
Map<QueueAttributeName, String> attributes = new HashMap<>(); attributes.put(QueueAttributeName.FIFO_QUEUE, Boolean.TRUE.toString()); CreateQueueRequest request = CreateQueueRequest.builder() .queueName("MyQueue.fifo") .attributes(attributes) // Set FIFO_QUEUE attribute to true .build(); this.sqsClient.createQueue(request);
Note that the queue name of the FIFO queue we specified here is “MyQueue.fifo” because there is a contract defined by SqsClient
that a FIFO queue name must end with the .fifo
suffix. Also, we need to set QueueAttributeName.FIFO_QUEUE
attribute to true
in the attributes of the CreateQueueRequest
.
3.3 Sending Groups of Messages With Deduplication Enabled
We use two producers to simulate two user applications that send two groups of messages to the SQS FIFO queue. The SqsClient
provides sendMessage
API for us to send a message to the SQS FIFO queue. Each producer sends a group of messages with a message group Id. Messages that belong to the same message group are processed in a FIFO manner. Each message is sent with a unique deduplication Id within the same message group. If a message with a particular deduplication Id is sent successfully, any messages sent with the same deduplication Id are accepted successfully but aren’t delivered during the 5-minute deduplication interval.
For demonstration purpose, we create a sample message myEvent
with a client side sequence number and serialise it to JSON string:
MyEvent myEvent = new MyEvent(); myEvent.setId(UUID.randomUUID().toString()); myEvent.setSource(Thread.currentThread().getName()); myEvent.setClientSideSequenceNumber(1); myEvent.setPayload("AWS SQS FIFO queue message group example."); String message = null; try { message = objectMapper.writeValueAsString(myEvent); } catch (JsonProcessingException e) { logger.error(e); }
Then we generate a custom deduplication Id by using the message group Id and the message’s sequence number:
// custom dedup id: groupId + ":" + seqNum String deduplicationId = "example-group-1" + ":" + 1;
Finally, we build an instance of SendMessageRequest
with the message attributes and call sendMessage
API to send the message:
SendMessageRequest sendMessageRequest = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(message) .messageDeduplicationId("example-group-1:1") // deduplication Id .messageGroupId("example-group-1") // message group Id .build(); // send the message logger.info("Sending message #{} of {} to queue {}", i, this.numberOfEvents, this.queueName); this.sqsClient.sendMessage(sendMessageRequest);
Note that to verify the message deduplication feature, we deliberately send a duplicate of the last message in the message group as below:
// deliberately send a duplicate of the last message if (i == this.numberOfEvents) { logger.info("Sending a duplicate message #{} to queue {}", i, this.queueName); this.sqsClient.sendMessage(sendMessageRequest); }
3.4 Consuming Groups of Messages
We use two consumers to simulate two user applications that consume messages from the queue. The SqsClient
provides straight forward API receiveMessage
for us to receive messages from a queue with short polling or long polling. Please refer to AWS SQS Polling Example in Java for more details of receiving messages by polling.
The following code snippet shows an example of using long-polling to receive messages from a queue. If there is a message available in 20 seconds, the call will return with the message. Otherwise, the call will keep waiting until 20 seconds timed out and then returns an empty list.
// long polling and wait for waitTimeSeconds before timed out ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .waitTimeSeconds(LONG_POLLING_SECONDS) // force long polling .attributeNamesWithStrings("All") // return all attributes .build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();
Note that we set attributeNamesWithStrings
to All
when building the ReceiveMessageRequest
because we would like to get system attributes such as MessageGroupId, MessageDeduplicationId, SequenceNumber and sentTimestamp for demonstration purposes when processing messages later on. For more details about message attributes, please refer to AWS SQS Message Attributes Example. Also, note that receiveMessage
might return messages with multiple MessageGroupId
values. For each MessageGroupId
, the messages are sorted by time sent. When messages that belong to a particular message group ID are received by one consumer, the whole message group is invisible to other consumers which means no other consumer can process messages with the same message group ID.
Then we can process the message and log the details to see if the messages within a group are handled in order:
/** * Process message. * * @param message the message */ private void processMessage(Message message) { logger.info("Processing message {}", message.messageId()); // extract system attributes: groupId, deduplicationId, sequenceNumber, sentTimestamp String groupId = extractAttribute(message, MessageSystemAttributeName.MESSAGE_GROUP_ID.toString()); String deduplicationId = extractAttribute(message, MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID.toString()); String sequenceNumber = extractAttribute(message, MessageSystemAttributeName.SEQUENCE_NUMBER.toString()); String sentTimestamp = extractAttribute(message, MessageSystemAttributeName.SENT_TIMESTAMP.toString()); // deserialise message body MyEvent myEvent = null; try { myEvent = objectMapper.readValue(message.body(), MyEvent.class); } catch (JsonProcessingException e) { logger.error(e); } if (myEvent != null) { logger.info("Message processed: groupId={}, dedupId={}, seqNum={}, ts={}, cSeqNum={}, MyEvent={}", groupId, deduplicationId, sequenceNumber, sentTimestamp, myEvent.getClientSideSequenceNumber(), myEvent.toString()); } }
3.5 Running the Example
For ease of dependencies management and build, we use Gradle 6.0 as the build tool. To run the example, you can run the fifoQueueExample
gradle task from the command line as below:
gradlew fifoQueueExample
The output would be:
[00:03:15,124] [INFO] aws.demo.FifoQueueExample:146 [main] - Create FIFO queue MyQueue.fifo in region ap-southeast-2 [00:03:15,912] [INFO] aws.demo.FifoQueueExample:155 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue.fifo [00:03:16,049] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #1 of 3 to queue MyQueue.fifo [00:03:16,144] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #2 of 3 to queue MyQueue.fifo [00:03:16,146] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-2] - Sending message #1 of 2 to queue MyQueue.fifo [00:03:16,213] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-1] - Sending message #3 of 3 to queue MyQueue.fifo [00:03:16,225] [INFO] aws.demo.FifoQueueExample$MyProducer:307 [pool-3-thread-2] - Sending message #2 of 2 to queue MyQueue.fifo [00:03:16,253] [INFO] aws.demo.FifoQueueExample$MyProducer:312 [pool-3-thread-1] - Sending a duplicate message #3 in example-group-1 to queue MyQueue.fifo [00:03:16,266] [INFO] aws.demo.FifoQueueExample$MyProducer:312 [pool-3-thread-2] - Sending a duplicate message #2 in example-group-2 to queue MyQueue.fifo [00:03:16,304] [INFO] aws.demo.FifoQueueExample$MyConsumer:376 [pool-4-thread-2] - Receiving messages from MyQueue.fifo... [00:03:16,304] [INFO] aws.demo.FifoQueueExample$MyConsumer:376 [pool-4-thread-1] - Receiving messages from MyQueue.fifo... [00:03:16,400] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received. [00:03:16,402] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 1 messages received. [00:03:16,402] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 96e12eed-a9bc-473d-86f8-f77a4e341e9b [00:03:16,403] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-2] - Processing message 6eb500d6-c70e-4219-9202-8ce51b71b4dc [00:03:16,438] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:1, seqNum=18856345555320815616, ts=1600005787544, cSeqNum=1, MyEvent=MyEvent [id=1ba1f5df-222a-422e-bec4-c4f95951b060, timeStamp=2020-09-13T14:03:15.917794300Z, clientSideSequenceNumber=1, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.] [00:03:16,438] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-2] - Message processed: groupId=example-group-2, dedupId=example-group-2:1, seqNum=18856345555344112384, ts=1600005787635, cSeqNum=1, MyEvent=MyEvent [id=64c1fe28-768e-4ef2-b3c7-807af34e4499, timeStamp=2020-09-13T14:03:15.917794300Z, clientSideSequenceNumber=1, source=pool-3-thread-2, payload=AWS SQS FIFO queue message group example.] [00:03:16,468] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-2] - Deleting message 6eb500d6-c70e-4219-9202-8ce51b71b4dc from queue: MyQueue.fifo [00:03:16,476] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 96e12eed-a9bc-473d-86f8-f77a4e341e9b from queue: MyQueue.fifo [00:03:16,596] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received. [00:03:16,597] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 9db843b2-eb14-44dc-bf99-9fab6012e49b [00:03:16,598] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:2, seqNum=18856345555341551872, ts=1600005787625, cSeqNum=2, MyEvent=MyEvent [id=91feeb7f-d1da-43f3-ab87-70776cf92ea4, timeStamp=2020-09-13T14:03:16.113905500Z, clientSideSequenceNumber=2, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.] [00:03:16,626] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 9db843b2-eb14-44dc-bf99-9fab6012e49b from queue: MyQueue.fifo [00:03:16,646] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 1 messages received. [00:03:16,647] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-2] - Processing message a7c5a783-284b-4830-8760-34c485ceb5bf [00:03:16,648] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-2] - Message processed: groupId=example-group-2, dedupId=example-group-2:2, seqNum=18856345555361776128, ts=1600005787704, cSeqNum=2, MyEvent=MyEvent [id=f40e85ec-43cb-45bd-aa2a-f862585ae2ec, timeStamp=2020-09-13T14:03:16.198012300Z, clientSideSequenceNumber=2, source=pool-3-thread-2, payload=AWS SQS FIFO queue message group example.] [00:03:16,674] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-2] - Deleting message a7c5a783-284b-4830-8760-34c485ceb5bf from queue: MyQueue.fifo [00:03:16,738] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 1 messages received. [00:03:16,738] [INFO] aws.demo.FifoQueueExample$MyConsumer:410 [pool-4-thread-1] - Processing message 254870d8-6426-44fd-855e-72108223b4c3 [00:03:16,739] [INFO] aws.demo.FifoQueueExample$MyConsumer:427 [pool-4-thread-1] - Message processed: groupId=example-group-1, dedupId=example-group-1:3, seqNum=18856345555358192384, ts=1600005787690, cSeqNum=3, MyEvent=MyEvent [id=ad384912-0b99-462c-981e-9beca19b8a95, timeStamp=2020-09-13T14:03:16.185902200Z, clientSideSequenceNumber=3, source=pool-3-thread-1, payload=AWS SQS FIFO queue message group example.] [00:03:16,766] [INFO] aws.demo.FifoQueueExample$MyConsumer:440 [pool-4-thread-1] - Deleting message 254870d8-6426-44fd-855e-72108223b4c3 from queue: MyQueue.fifo [00:03:36,804] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-2] - 0 messages received. [00:03:36,804] [INFO] aws.demo.FifoQueueExample$MyConsumer:392 [pool-4-thread-2] - Consumer aborted after waiting for 20s with no message received. [00:03:36,914] [INFO] aws.demo.FifoQueueExample$MyConsumer:389 [pool-4-thread-1] - 0 messages received. [00:03:36,915] [INFO] aws.demo.FifoQueueExample$MyConsumer:392 [pool-4-thread-1] - Consumer aborted after waiting for 20s with no message received. [00:03:36,916] [INFO] aws.demo.FifoQueueExample:165 [main] - Delete queue MyQueue.fifo [00:03:37,036] [INFO] aws.demo.FifoQueueExample:170 [main] - Queue MyQueue.fifo deleted.
We can see from the output above that the messages sent in two groups have been received and processed in the order. In addition, the duplicated messages we sent on purpose have been deduplicated by the SQS FIFO queue successfully.
4. Download the Example Source Code
That was a AWS SQS FIFO Queue Example in Java.
You can download the full source code of this example here: AWS SQS FIFO Queue Example in Java