Amazon AWS

Fanout Example with AWS SNS and AWS SQS in Java

In this example, we are going to show you how to implement fanout by using the combination of AWS SNS and AWS SQS programmatically in Java.

1. Introduction

Fanout is a messaging pattern with which we can deliver a message to multiple consumers. Some modern message brokers such as RabbitMQ has built-in support for fanout pattern by using a fanout exchange.

Amazon Simple Queue Service (SQS) is a fully managed message queuing service and Amazon Simple Notification Service (SNS) is a fully managed messaging service for both system-to-system and app-to-person (A2P) communication in AWS cloud.

 

2. Technologies Used

Before we start, please follow AWS SDK for Java 2.0 Developer Guide to set up AWS credentials and region for development.

3. Fanout Example

3.1 The Components

As fanout is a way to broadcast one message to multiple consumers, there are three kinds of components in our example:

  • Publishers: a publisher is a user application that sends messages.
  • Fanout Core: A component receives messages from publishers and then broadcasts messages to consumers.
  • Consumers: a consumer is a user application that receives messages.

3.2 Fanout Core

As the most important component in our fanout implementation, the fanout core consists of three parts:

  • An AWS SNS topic
  • Multiple AWS SQS standard queues
  • The subscriptions between SQS queues and the SNS topic

3.2.1 Creating a SNS Topic

An Amazon SNS topic is a logical access point that acts as a communication channel. It can have multiple endpoints such as AWS SQS, AWS Lambda, an email address, etc. When a message is sent to the topic, it will be broadcast to the endpoints subscribed.

To create an SNS topic, we need to use a service client for accessing Amazon SNS: the SnsClient. The following code snippet will create a new instance of SnsClient with its region set to ap-southeast-2. Note that the AWS region is something that needs to be taken into account when designing a messaging system in production because delays among different regions can have an impact on the design. For example, the message publisher and the SNS topic are in different regions.

SnsClient snsClient = SnsClient.builder().region(Region.of("ap-southeast-2")).build();

Then we can simply invoke the ceateTopic API provided by SnsClient with an instance of CreateTopicRequest to create a new topic as below:

CreateTopicRequest request = CreateTopicRequest.builder().name("MyTopic").build();
CreateTopicResponse response = snsClient.createTopic(request);

3.2.2 Creating a SQS Standard Queue

SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent. Currently, AWS SNS can only deliver notifications to AWS SQS standard queues. You might have already known by guessing that there would be a similar client for SQS as the SnsClient we used above. You are right that there is a service client for accessing Amazon SQS named SqsClient. Similarly, we can create a new instance of SqsClient as below:

SqsClient sqsClient = SqsClient.builder().region(Region.of("ap-southeast-2")).build();

Then invoke the createQueue API provided by SqsClient with an instance of CreateQueueRequest to create a new standard queue as below:

CreateQueueRequest request = CreateQueueRequest.builder().queueName("MyQueue1").build();
CreateQueueResponse response = sqsClient.createQueue(request);

Note that the queue name of the standard queue we specified here is “MyQueue1” because there is a contract defined by SqsClient that a FIFO queue name must end with the .fifo suffix. Also, for the fanout example, we are going to create two SQS standard queues to demonstrate the message broadcasting.

3.2.3 Creating a Subscription

To receive messages published to a topic, we need to subscribe to an endpoint (such as AWS SQS, AWS Lambda, or an email address) to the topic. When we subscribe to an endpoint to a topic and confirm the subscription, the endpoint begins to receive messages published to the associated topic.

Before we subscribe to the SQS queue to the SNS topic, we need to grant sqs:SendMessage permission on the given queue to the topic. This is very important and without doing this the SNS topic won’t be able to broadcast notifications to the SQS queue. The following code snippet will grant the permission:

String policyDocument =
    "{" +
    "  \"Version\": \"2012-10-17\"," +
    "  \"Statement\": [{" +
    "    \"Effect\":\"Allow\"," +
    "    \"Principal\": {" +
    "      \"Service\": \"sns.amazonaws.com\"" +
    "    }," +
    "    \"Action\":\"sqs:SendMessage\"," +
    "    \"Resource\":\"arn:aws:sqs:ap-southeast-2:000000000000:MyQueue\"," +
    "    \"Condition\":{" +
    "      \"ArnEquals\":{" +
    "        \"aws:SourceArn\":\"arn:aws:sns:ap-southeast-2:000000000000:MyTopic\"" +
    "      }" +
    "    }" +
    "  }]" +
    "}";
Map<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.POLICY, policyDocument);
SetQueueAttributesRequest setQueueAttributesRequest = SetQueueAttributesRequest.builder().queueUrl(queueUrl).attributes(attributes).build();
SetQueueAttributesResponse response = sqsClient.setQueueAttributes(setQueueAttributesRequest);

Note that the IAM policy document is written in JSON format. For more details please refer to IAM JSON Policy Reference.

Then we can create the subscription as below:

SubscribeRequest request = SubscribeRequest.builder().protocol("sqs").endpoint(queueArn).returnSubscriptionArn(true).topicArn(topicArn).build();
SubscribeResponse response = snsClient.subscribe(request);

3.3 Consumers

After setting up the fanout core, we need a consumer to consume the message once it is ready in the queue. The SqsClient provides straight forward API receiveMessage for us to receive messages from a queue with short polling or long polling. Long polling results in higher performance at a reduced cost in the majority of use cases. So it is recommended unless the application expects an immediate response from a receiveMessage call. To simulate a user application consuming messages from the queue, we use a thread to run a Runnable task too long poll messages. More details can be found in the source code attached.

The following code snippet shows an example of using long-polling to receive messages from a queue. If there is a message available in 20 seconds, the call will return with the message. Otherwise, the call will keep waiting until 20 seconds timed out and then returns an empty list.

// long polling and wait for 20 seconds before timed out
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(queueUrl).waitTimeSeconds(20).build();
List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();

3.4 Publishers

The third component in the example is publishers. A publisher will send a message to the SNS topic we have created before. The SnsClient provides publish API for us to publish a message to the SNS topic. We create a sample message myEvent and serialize it to JSON string, then call publish API to publish it.

// sample message
MyEvent myEvent = new MyEvent();
myEvent.setId(UUID.randomUUID().toString());
myEvent.setSource(Thread.currentThread().getName());
myEvent.setPayload("AWS SNS + SQS Fanout is fun!");

// serialise to JSON string
String message = null;
try {
    message = objectMapper.writeValueAsString(myEvent);
} catch (JsonProcessingException e) {
    logger.error(e);
}

if (message != null) {
    PublishRequest publishRequest = PublishRequest.builder().subject("Fanout Example").message(message).topicArn(topicArn).build();
    PublishResponse publishResponse = this.snsClient.publish(publishRequest);
    if (publishResponse.sdkHttpResponse().isSuccessful()) {
        logger.info("Message {} published", publishResponse.messageId());
    } else {
        logger.error("Failed to publish message {} to {}: {} - {}", publishResponse.messageId(), topicArn,
                publishResponse.sdkHttpResponse().statusCode(), publishResponse.sdkHttpResponse().statusText());
    }
}

3.5 Running the Example

For ease of dependencies management and building, we use Gradle 6.0 as the build tool. To run the example, you can run the fanoutExample The Gradle task from the command line as below:

gradlew fanoutExample

The output would be:

[23:19:46,782] [INFO] aws.demo.FanoutExample:203 [main] - Create topic MyTopic in region ap-southeast-2
[23:19:47,523] [INFO] aws.demo.FanoutExample:219 [main] - Create queue MyQueue1 in region ap-southeast-2
[23:19:47,791] [INFO] aws.demo.FanoutExample:223 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue1
[23:19:48,014] [INFO] aws.demo.FanoutExample:255 [main] - Grant 'sqs:SendMessage' permission on MyQueue1 to arn:aws:sns:ap-southeast-2:000000000000:MyTopic successfully
[23:19:48,257] [INFO] aws.demo.FanoutExample:273 [main] - Subscribed arn:aws:sqs:ap-southeast-2:000000000000:MyQueue1 to arn:aws:sns:ap-southeast-2:000000000000:MyTopic. Subscription ARN: arn:aws:sns:ap-southeast-2:000000000000:MyTopic:0f6f6f4e-30d5-4f46-853a-6f7ad73923b8
[23:19:48,260] [INFO] aws.demo.FanoutExample:219 [main] - Create queue MyQueue2 in region ap-southeast-2
[23:19:48,261] [INFO] aws.demo.FanoutExample$MyConsumer:370 [pool-4-thread-1] - Receiving messages from MyQueue1...
[23:19:48,396] [INFO] aws.demo.FanoutExample:223 [main] - Queue URL: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue2
[23:19:48,551] [INFO] aws.demo.FanoutExample:255 [main] - Grant 'sqs:SendMessage' permission on MyQueue2 to arn:aws:sns:ap-southeast-2:000000000000:MyTopic successfully
[23:19:48,701] [INFO] aws.demo.FanoutExample:273 [main] - Subscribed arn:aws:sqs:ap-southeast-2:000000000000:MyQueue2 to arn:aws:sns:ap-southeast-2:000000000000:MyTopic. Subscription ARN: arn:aws:sns:ap-southeast-2:000000000000:MyTopic:4b258157-b5db-4825-bbfb-a6b991108e91
[23:19:48,703] [INFO] aws.demo.FanoutExample$MyConsumer:370 [pool-4-thread-2] - Receiving messages from MyQueue2...
[23:19:48,711] [INFO] aws.demo.FanoutExample$MyPublisher:466 [pool-5-thread-1] - Publishing message to MyTopic
[23:19:48,712] [INFO] aws.demo.FanoutExample$MyPublisher:496 [pool-5-thread-1] - Get ARN for topic MyTopic
[23:19:48,910] [INFO] aws.demo.FanoutExample$MyPublisher:487 [pool-5-thread-1] - Message 174845de-749b-51a1-9ac8-126af2b1810a published
[23:19:49,068] [INFO] aws.demo.FanoutExample$MyConsumer:378 [pool-4-thread-1] - 1 messages received.
[23:19:49,068] [INFO] aws.demo.FanoutExample$MyConsumer:393 [pool-4-thread-1] - Processing message fe963fbd-3f65-4bc8-8e40-6669a19a5fae
[23:19:49,075] [INFO] aws.demo.FanoutExample$MyConsumer:378 [pool-4-thread-2] - 1 messages received.
[23:19:49,076] [INFO] aws.demo.FanoutExample$MyConsumer:393 [pool-4-thread-2] - Processing message 0c7e71e1-79c6-483d-94be-98408b21e0c1
[23:19:49,155] [INFO] aws.demo.FanoutExample$MyConsumer:408 [pool-4-thread-2] - MyEvent[id='519725c8-6148-458e-9031-c1007e9d4c9c', timeStamp=2020-08-17T13:19:48.750724400Z, source='pool-5-thread-1', payload='AWS SNS + SQS Fanout is fun!']
[23:19:49,155] [INFO] aws.demo.FanoutExample$MyConsumer:408 [pool-4-thread-1] - MyEvent[id='519725c8-6148-458e-9031-c1007e9d4c9c', timeStamp=2020-08-17T13:19:48.750724400Z, source='pool-5-thread-1', payload='AWS SNS + SQS Fanout is fun!']
[23:19:49,186] [INFO] aws.demo.FanoutExample$MyConsumer:425 [pool-4-thread-2] - Deleting message 0c7e71e1-79c6-483d-94be-98408b21e0c1 from queue: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue2
[23:19:49,193] [INFO] aws.demo.FanoutExample$MyConsumer:425 [pool-4-thread-1] - Deleting message fe963fbd-3f65-4bc8-8e40-6669a19a5fae from queue: https://sqs.ap-southeast-2.amazonaws.com/000000000000/MyQueue1
[23:19:49,401] [INFO] aws.demo.FanoutExample:290 [main] - Subscription arn:aws:sns:ap-southeast-2:000000000000:MyTopic:0f6f6f4e-30d5-4f46-853a-6f7ad73923b8 deleted
[23:19:49,531] [INFO] aws.demo.FanoutExample:290 [main] - Subscription arn:aws:sns:ap-southeast-2:000000000000:MyTopic:4b258157-b5db-4825-bbfb-a6b991108e91 deleted
[23:19:49,532] [INFO] aws.demo.FanoutExample:299 [main] - Delete queue MyQueue1
[23:19:49,695] [INFO] aws.demo.FanoutExample:304 [main] - Queue MyQueue1 deleted.
[23:19:49,695] [INFO] aws.demo.FanoutExample:299 [main] - Delete queue MyQueue2
[23:19:49,817] [INFO] aws.demo.FanoutExample:304 [main] - Queue MyQueue2 deleted.
[23:19:49,930] [INFO] aws.demo.FanoutExample:319 [main] - Topic arn:aws:sns:ap-southeast-2:000000000000:MyTopic deleted

We can see from the output above both consumers (pool-4-thread-1 and pool-4-thread-2) polling from the queues subscribed to MyTopic received a copy of MyEvent with the identical id 519725c8-6148-458e-9031-c1007e9d4c9c after the publisher published a message to MyTopic. The message has been fanned out successfully.

4. Download the Example Source Code

Download
You can download the full source code of this example here: Fanout Example with AWS SNS and AWS SQS in Java

Kevin Yang

A software design and development professional with seventeen years’ experience in the IT industry, especially with Java EE and .NET, I have worked for software companies, scientific research institutes and websites.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button