Tag Archives: message topics

Application integration patterns for microservices: Fan-out strategies

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/application-integration-patterns-for-microservices-fan-out-strategies/

This post is courtesy of Dirk Fröhner

The first blog in this series introduced asynchronous messaging for building loosely coupled systems that can scale, operate, and evolve individually. It considered messaging as a communications model for microservices architectures. This post covers concrete architectural considerations, focusing on the messaging architecture.

Wild Rydes

Wild Rydes is a fictional technology start-up. You may have heard of it – it disrupts individual transportation by replacing traditional taxis with unicorns. We use the Wild Rydes storyline in several hands-on AWS workshops. It illustrates concepts such as serverless development, event-driven design, API management, and messaging in microservices.

This blog post explores the decision-making process in building the Wild Rydes workshop, with a goal of helping you apply these concepts to your applications.

In the workshop, a customer requests a ‘unicorn’ ride using the Wild Rydes customer application. Registered unicorn drivers can use the application to manage their rides. Unicorn drivers submit a ride completion message after they have successfully delivered a customer to their destination.

Wild Rydes app image

Submit a ride completion

API exposed by the unicorn management service

At Wild Rydes, end-user clients are implemented as mobile applications and communicate via REST APIs (also known as hypermedia APIs) with the backend services.

For this use case, the application interacts with the API exposed by the unicorn management service. It uses the submit-ride-completion resource that it discovered from the API’s home document to send the relevant details of a ride to the backend. In response, the backend persists these details, creates a new completed-ride resource. This returns the respective status code, the location, and a representation of the new resource to the client. The API details are shown below.

Request from client to submit the details of a completed ride:

POST /<submit-ride-completion-resource-path> HTTP/1.1
Content-Type: application/json;charset=UTF-8
...

{
    "from": "...",
    "to": "...",
    "duration": "...",
    "distance": "...",
    "customer": "...",
    "fare": "..."
}

Response from the unicorn management service:

HTTP/1.1 201 Created
Date: Sat, 31 Aug 2019 12:00:00 GMT
Location: <url-of-newly-created-completed-ride-resource>
Content-Location: <url-of-newly-created-completed-ride-resource>
Content-Type: application/json;charset=UTF-8
...

{
    "links": {
        "self": {
            "href": "https://..."
        }
    },
    <completed-ride-resource-representation-properties>
}

Schematic architecture for the use case

The schematic architecture for the use case is shown in diagram 1 below:

Diagram 1: Mutliple microservices need information about ride completion

Diagram 1: Multiple microservices need information about ride completion

There are other microservices in Wild Rydes that are also interested in a new completed ride. The examples from the diagram are:

  • Customer notification service: customers should receive a notification in the app about their latest completed ride.
  • Customer accounting service: After all, Wild Rydes is a business, so this service is responsible for collecting the fare from the customer.
  • Customer loyalty service: Everybody wants to collect miles and would like to receive benefits for being a loyal customer.
  • Data lake ingestion service: Wild Rydes is a data-driven company and they want to ingest all data generated from any process into their data lake for arbitrary analytics.
  • Extraordinary rides service: This special service is interested in rides with fares or distances above certain thresholds for preparing insights for business managers.

Based on this scenario, let’s review the integration options.

Integration options

Integration via database

The unicorn management service stores the details of a completed ride in a database. It could share the database with the other services directly, but that creates tight coupling. Sharing the database also restricts your flexibility to scale and evolve your services.

Integration via REST APIs

What about using REST APIs for the integration? The HTTP-based implementation of the REST architectural style uses the distributed architecture concepts of the web. However, what does this mean for the implementation?

Diagram 2: Using REST APIs to communicate to microservices

Diagram 2: Using REST APIs to communicate to microservices

As shown in diagram 2 above:

  • Effectively, all interested services on the right-hand side would have to expose an API resource. These would be called by the unicorn management service for each newly completed ride.
  • To enable elasticity behind a single resource URL, you may need a load balancer in front of each interested service.
  • The unicorn management service would have to know about all these interested services and their respective APIs. Hopefully, each service uses a streamlined API resource.
  • Lastly, the unicorn management service must store, retry, and track all request attempts in case an interested service is not available. This ensures durability so we don’t lose any of these notifications.

One approach is to manage a recipient list in the unicorn management service. This adds additional complexity to the unicorn management service and coupling on both sides. Although there are self-registration and discovery approaches, managing a recipient list is not the core use case of the unicorn management service.

Diagram 3: Using a separate service to manage the fan-out to other services

Diagram 3: Using a separate service to manage the fan-out to other services

A better approach would be to externalize the recipient list into a separate Request Distribution Service, as diagram 3 shows. This decouples both sides, but binds each side to the new service. Still, the unicorn management service is still responsible for the delivery of the ride data to all the recipients. Again, this heavy lifting is not the main task of this service.

Diagram 4: Filtering information for extraordinary rides

Diagram 4: Filtering information for extraordinary rides

In diagram 4, the information filtering for the Extraordinary Rides Service is self-managed. This means that there is code on one side to either not send or to discard irrelevant ride data.

For this use case, integration via REST APIs potentially adds coupling to the services. And it adds heavy lifting to the services that is beyond their actual domain.

Integration via messaging

A third option could use messaging for the integration.

Publish-subscribe pattern

Both Amazon SNS and Amazon EventBridge can be used to implement the publish-subscribe pattern.  In this use case, we recommend Amazon SNS, which scales to support high throughput and fan-out applications. Amazon EventBridge includes direct integrations with software as a service (SaaS) applications and other AWS services. It’s ideal for publish-subscribe use cases involving these types of integrations.

Diagram 5: Using Amazon SNS to implement a publish-subscribe pattern

Diagram 5: Using Amazon SNS to implement a publish-subscribe pattern

Diagram 5 shows an SNS topic called Ride Completion Topic. The unicorn management service can now send the details about a completed ride into that topic. All interested services on the right-hand side can subscribe to this topic.

Using a message topic to publish the details of a completed ride frees us from managing the recipient list, as well as making ensuring reliable delivery of the messages. It also decouples both sides as much as possible. Services on the right-hand side can autonomously subscribe to the topic. The Unicorn Management Service does not know anything about the topic’s subscribers.

Message filter pattern

Looking at the Extraordinary Rides Service, the message filter functionality of Amazon SNS can autonomously and individually discard irrelevant messages. The Extraordinary Rides Service can specify the threshold values for the fare and distance.

Diagram 6: Filtering extraordinary rides using Amazon SNS

Diagram 6: Filtering extraordinary rides using Amazon SNS

Topic-queue-chaining pattern

Consider the publish-subscribe channel between the Unicorn Management Service, and the subscribing services on the right-hand side.

One of the consuming services may go offline for maintenance. Or the code that processes messages from the ride completion topic could run into an exception. These are two examples where a subscriber service could potentially miss topic messages.

A good pattern to apply here is topic-queue-chaining. That means that you add a queue, in our case an SQS queue, between the ride completion topic and each of the subscriber services. As messages are buffered persistently in an SQS queue, it prevents lost messages if a subscriber process run into problems for many hours or days.

Diagram 7: Chaining topics and queues to buffer messages persistently

Diagram 7: Chaining topics and queues to buffer messages persistently

Queues as buffering load balancers

An SQS queue in front of each subscriber service also acts as a buffering load balancer.

Since every message is delivered to one of potentially many consumer processes, you can scale out the subscriber services, and the message load is distributed over the available consumer processes.

As messages are buffered in the queue, they are preserved during a scaling event, such as when you must wait until an additional consumer process becomes operational.

Lastly, these queue characteristics help flatten peak loads for your consumer processes, buffering messages until consumers are available. This allows you to process messages at a pace decoupled from the message source.

Conclusion

The Wild Rydes example shows how messaging can provide decoupling and greater flexibility for your microservices landscape.

In contrast to REST APIs, a messaging system takes care of message delivery outside of your service code. Using a publish-subscribe channel provides simple fan-out capability. And message filters allow for selective message reception without the effort of implementing that logic into your code.

With topic-queue-chaining pattern, you can add queue characteristics to a fan-out scenario so that you can easily scale out on the consumer side, and flatten peak loads.

For a deeper dive into queues and topics and how to use them in your microservices architecture, please use the following resources:

  1. AWS whitepaper: Implementing Microservices on AWS
  2. AWS blog: Implementing enterprise integration patterns with AWS messaging services: point-to-point channels
  3. AWS blog: Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels
  4. AWS blog: Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox

Understanding asynchronous messaging for microservices

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/understanding-asynchronous-messaging-for-microservices/

This post is courtesy of Dirk Fröhner

One of the implications of applying the microservices architectural style is that much communication between components happens over the network. After all, your microservices landscape is a distributed system. To achieve the promises of microservices, such as being able to individually scale, operate, and evolve each service, this communication must happen in a loosely coupled and reliable manner.

A common way to loosely couple services is to expose an API following the REST architectural style. REST APIs are based on the architecture of the web and provide loose coupling between communicating parties. REST APIs offer a great way to decouple interfaces from concrete implementations, and to advise clients about what they can do next, by the use of links and link relations.

While REST APIs are common and useful in microservices design, REST APIs tend to be designed with synchronous communications, where a response is required. A request coming from an end-user client can trigger a complex communications path within your services landscape, which can effectively add coupling between the services at runtime. After all, this is why there are mitigation patterns like circuit-breaker in the first place. REST APIs can also add some heavy lifting to your infrastructure that we will discuss further below.

Asynchronous messaging

If loose-coupling is important, especially in a system that requires high resilience and has unpredictable scale, another option is asynchronous messaging.

Asynchronous messaging is a fundamental approach for integrating independent systems, or building up a set of loosely coupled systems that can operate, scale, and evolve independently and flexibly. As our colleague Tim Bray said, “If your application is cloud-native, or large-scale, or distributed, and doesn’t include a messaging component, that’s probably a bug.” In this blog post, we will outline some fundamental benefits of asynchronous messaging for the communications between microservices.

For a refresher on the fundamental messaging patterns and their implementations with Amazon SQS, Amazon SNS, and Amazon MQ, please read our previous blog posts

For a summary of the semantics of queues and topics:

  • A queue is like a buffer. You can put messages into a queue, and you can retrieve messages from a queue. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue.
  • A topic is like a broadcasting station. You can publish messages to a topic, and anyone interested in these messages can subscribe to the topic. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern).

Use-case

Consider a typical scenario illustrated in the diagram below. An end-user client (EUC) addresses an API resource of one of our services, through Amazon API Gateway in this example. From there, the request can potentially follow a path across the microservices landscape to get completely processed.

To provide the final result, there will be potentially cascading subsequent requests sent between other microservices. This example illustrates the complexity involved in processing a single end user request.

End User Client accessing a service using an API

Diagram 1: End-User Client accessing a service using an API

End-user clients (EUCs) often communicate with services via REST APIs in a synchronous manner. However, the communication can also be designed using an asynchronous approach. For instance, if an EUC submits a request that takes some time to process, the respective API resource can respond with HTTP status 202 Accepted, and a link to a resource that provides the current processing status. Downstream, the communication between the service that receives that request, and other services that are involved in processing the request, can happen asynchronously using messaging services.

There are situations where a communications model using asynchronous messaging can make your life easier than using REST APIs.

Infrastructure complexity

Start with looking at the infrastructure complexity for the backends of your services. Depending on your implementation paradigm, you have to include different components in your infrastructure that you don’t have to deal with when using messaging.

Imagine your services each expose a REST API. Typically, this means you add a load balancer in front of your compute layer, and your backend implementation includes an HTTP server. It is usually a good idea to decouple your services APIs from their concrete implementations, so you could also consider adding Amazon API Gateway in front of your load balancer.

For a serverless approach, you don’t need to worry about load balancing and scaling out infrastructure. Amazon API Gateway with AWS Lambda integration provides a fully managed solution for removing complexities around infrastructure management.

Using Amazon SQS as a cloud-native messaging service for queues, you don’t employ any of the above mentioned components. As described in a prior post, an SQS queue can act as a load balancer in itself. The consumers, or target services, don’t need an HTTP server, but simply ask a queue for available messages. If you use AWS Lambda for your consumers, this process is even simpler, as the Lambda functions are automatically invoked when messages appear in an SQS queue. See Using AWS Lambda with Amazon SQS to learn more.

The same applies to Serverless architectures implementing a publish/subscribe pattern. Lambda function executions can be directly triggered by SNS messages. Without AWS Lambda, you need load balancers and web servers in your backend implementations to receive SNS notifications, as those are injected via web hooks into your services. SNS also provides the fan-out functionality that you would otherwise have to build using an intermediary component to implement a recipient list of subscribers.

Reliability, resilience

For synchronous systems, if a service crashes while it processes the payload of an API request, the information is lost. A good way to prevent this on a microservice is to explicitly persist an incoming request immediately after receiving it. Then process and reprocess, until the request is finally marked as resolved.

This approach requires additional work, and it requires the microservice to not crash while persisting an incoming API request. The microservice sending a request must also resend if the target service doesn’t acknowledge receipt. For example, it doesn’t respond with a successful HTTP status code, or the connection drops.

When sending messages to a queue, this additional work is addressed by the messaging infrastructure. A message will remain in a queue unless a consumer explicitly states that processing is finished by acknowledging the message reception. As long as message reception is not acknowledged by a consumer, it will stay in the queue. Messages can be retained in an SQS queue for a maximum of 14 days.

Scale out latency

Under increased load, your services must scale out to process the requests. You must then consider scale-out latency, which may be managed for you with serverless implementations. It takes a few moments from when an Auto Scaling group triggers the launch of additional instances until these are ready to operate. Also launching new container tasks takes time. When your scaling threshold is not optimal and the scaling event occurs late, your available resources may be unable to serve all incoming requests. These requests may be lost or answered with HTTP status code 5xx.

Using message queues that buffer messages during a scaling event help prevent this. Even in use cases where the EUC is waiting for an immediate response, this is the more reliable architecture. If your infrastructure needs time to scale out and you are not able to process all requests in time, the requests are persisted.

When messaging is your only choice

What happens when your services must respond to peak loads at scale?

For many applications, the scale-out latency, including load balancer pre-warming, will eventually become too large to handle steeply ascending loads fast enough. With a serverless architecture, exposing your Lambda functions with API Gateway can handle steeply ascending loads. But you must still consider downstream systems, which may be easily overwhelmed.

In these scenarios, where rapid scaling without overwhelming downstream systems is important, messaging may be your best choice. Message queues help protect your downstream services by buffering incoming payloads for consumption at the pace of the consuming service. This helps not only for the communications between microservices, but also when peak loads flood your client-facing API. Often, the most important goal is to accept an incoming request, while the actual processing of that request can happen later. You decouple these steps from each other by using queues.

Serverless messaging systems like Amazon SQS and Amazon SNS can respond quickly to support high scale. These are often the best solution when scale is unpredictable.  While the instance-based messaging system, Amazon MQ, provides compatibility with open standards, it requires manual scaling for large workloads, unlike serverless messaging services.

Conclusion

We hope you got some inspiration to also employ asynchronous messaging for your microservices communications architecture. In blog XYZ we provide concrete examples of these patterns. For more information, feel free to consume the following resources:

  1. AWS whitepaper: Implementing Microservices on AWS
  2. AWS blog: Implementing enterprise integration patterns with AWS messaging services: point-to-point channels
  3. AWS blog: Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels
  4. AWS blog: Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox

Read the next blog in the series,  Application Integration Patterns for Microservices: Fan-out Strategies.

Designing durable serverless apps with DLQs for Amazon SNS, Amazon SQS, AWS Lambda

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/designing-durable-serverless-apps-with-dlqs-for-amazon-sns-amazon-sqs-aws-lambda/

This post is courtesy of Otavio Ferreira, Sr Manager, SNS.

In a postal system, a dead-letter office is a facility for processing undeliverable mail. In pub/sub messaging, a dead-letter queue (DLQ) is a queue to which messages published to a topic can be sent, in case those messages cannot be delivered to a subscribed endpoint.

Amazon SNS supports DLQs, making your applications more resilient and durable upon delivery failure modes.

Understanding message delivery failures and retries

The delivery of a message fails when it’s not possible for Amazon SNS to access the subscribed endpoint. There are two reasons why this might happen:

  • Client errors, where the client is SNS (the message sender).
  • Server errors, where the server is the system that hosts the subscription endpoint (the message receiver), such as Amazon SQS or AWS Lambda.

Client errors

Client errors happen when SNS has stale subscription metadata. One common cause of client errors is when you (the endpoint owner) delete the endpoint. For example, you might delete the SQS queue that is subscribed to your SNS topic, without also deleting the SNS subscription corresponding to the queue. Another common cause is when you change the resource policy attached to your endpoint in a way that prevents SNS from delivering messages to that endpoint.

These errors are considered client errors because the client has attempted the delivery of a message to a destination that, from the client’s perspective, is no longer accessible. SNS does not retry the delivery of messages that failed as the result of client errors.

Server errors

Server errors happen when the system that powers the subscribed endpoint is unavailable, or when it returns an exception response indicating that it failed to process a valid request from SNS.

When server errors occur, SNS retries the failed deliveries according to a backoff function, which can be either linear or exponential. When a server error occurs for an AWS managed endpoint, backed by either SQS or Lambda, then SNS retries the delivery for up to 100,015 times, over 23 days.

Server errors can also happen with customer managed endpoints, namely HTTP, SMS, email, and mobile push endpoints. SNS also retries the delivery for these types of endpoints. HTTP endpoints support customer-defined retry policies, while SNS sets an internal delivery retry policy for SMS, email, and mobile push endpoints to 50 times, over 6 hours.

Delivery retries

SNS may receive a client error, or continue to receive a server error for a message beyond the number of retries defined by the corresponding retry policy. In that event, SNS discards the message. Setting a DLQ to your SNS subscription enables you to keep this message, regardless of the type of error, either client or server. DLQs give you more control over messages that cannot be delivered.

For more information on the delivery retry policy for each delivery protocol supported by SNS, see Amazon SNS Message Delivery Retry.

Using DLQs for AWS services

SNS, SQS, and Lambda support DLQs, addressing different failure modes. All DLQs are regular queues powered by SQS.

In SNS, DLQs store the messages that failed to be delivered to subscribed endpoints. For more information, see Amazon SNS Dead-Letter Queues.

In SQS, DLQs store the messages that failed to be processed by your consumer application. This failure mode can happen when producers and consumers fail to interpret aspects of the protocol that they use to communicate. In that case, the consumer receives the message from the queue, but fails to process it, as the message doesn’t have the structure or content that the consumer expects. The consumer can’t delete the message from the queue either. After exhausting the receive count in the redrive policy, SQS can sideline the message to the DLQ. For more information, see Amazon SQS Dead-Letter Queues.

In Lambda, DLQs store the messages that resulted in failed asynchronous executions of your Lambda function. An execution can result in an error for several reasons. Your code might raise an exception, time out, or run out of memory. The runtime executing your code might encounter an error and stop. Your function might hit its concurrency limit and be throttled. Regardless of the error type, when the error occurs, your code might have run completely, partially, or not at all. By default, Lambda retries an asynchronous execution twice. After exhausting the retries, Lambda can sideline the message to the DQL. For more information, see AWS Lambda Dead-Letter Queues.

When you have a fan-out architecture, with SQS queues and Lambda functions subscribed to an SNS topic, we recommend that you set DLQs to your SNS subscriptions, and to your destination queues and functions as well. This approach gives your application resilience against message delivery failures, message processing failures, and function execution failures too.

Applying DLQs in a use case

Here’s how everything comes together. The following diagram shows a serverless backend architecture that supports a car rental application. This is a durable serverless architecture based on DLQs for SNS, SQS, and Lambda.

Dead Letter Queue - DLQ SNS use case with architecture diagram

When a customer places an order to rent a car, the application sends that request to an API, which is powered by Amazon API Gateway. The REST API is backed by an SNS topic named Rental-Orders, and deployed onto an Amazon VPC subnet. The topic then fans out that order to the following two subscribed endpoints, for parallel processing:

  • An SQS queue, named Rental-Fulfilment, which feeds the integration with an internal fulfilment system hosted on Amazon EC2.
  • A Lambda function, named Rental-Billing, which processes and loads the customer order into a third-party billing system, also hosted on Amazon EC2.

To increase the durability of this serverless backend API, the following DLQs have been set up:

  • Two SNS DLQs, namely Rental-Fulfilment-Fanout-DLQ and Rental-Billing-Fanout-DLQ, which store the order in case either the subscribed SQS queue or Lambda function ever becomes unreachable.
  • An SQS DLQ, named Rental-Fulfilment-DLQ, which stores the order when the fulfilment system fails to process the order.
  • A Lambda DLQ, named Rental-Billing-DLQ, which stores the order when the function fails to process and load the order into the billing system.

When the DLQ captures the message, you can inspect the message for troubleshooting purposes. After you address the error at hand, you can poll the DLQ to retry the processing of the message.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or AWS CloudFormation. You can use the SDK, CLI, and API for polling the DLQs as well.

Configuring DLQs for subscriptions

You can attach a DLQ to an SNS subscription by setting the subscription’s RedrivePolicy parameter. The policy is a JSON object that refers to the DLQ ARN. The ARN must point to an SQS queue in the same AWS account as that of the SNS subscription. Also, both the DLQ and the subscription must be in the same AWS Region.

Here’s how you can configure one of the SNS DLQs applied in the car rental application example, presented earlier.

The following JSON object is a CloudFormation template that subscribes the SQS queue Rental-Fulfilment to the SNS topic Rental-Orders. The template also sets a RedrivePolicy that targets Rental-Fulfilment-Fanout-DLQ as a DLQ.

Lastly, the template sets a FilterPolicy value. It makes SNS deliver a message to the subscribed queue only if the published message carries an attribute named order-status with value set to either confirmed or canceled. As Amazon SNS Message Filtering happens before message delivery, messages that are filtered out aren’t sent to that subscription’s DLQ.

Internally, the CloudFormation template uses the SNS Subscribe API action for deploying the subscription and setting both policies, all part of the same API request.

{  
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
            "TopicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
            "RedrivePolicy": {
               "deadLetterTargetArn": 
                  "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"
            },
            "FilterPolicy": { 
               "order-status": [ "confirmed", "canceled" ]
            }
         }
      }
   }
}

Maybe the SNS topic and subscription are already deployed. In that case, you can use the SNS SetSubscriptionAttributes API action to set the RedrivePolicy, as shown by the following code examples, based on the AWS CLI and the AWS SDK for Java.

$ aws sns set-subscription-attributes 
   --region us-east-1
   --subscription-arn arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2
   --attribute-name RedrivePolicy 
   --attribute-value '{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"}'
AmazonSNS sns = AmazonSNSClientBuilder.defaultClient();

String subscriptionArn = "arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2";

String redrivePolicy = "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}";

SetSubscriptionAttributesRequest request = new SetSubscriptionAttributesRequest(
  subscriptionArn, 
  "RedrivePolicy", 
  redrivePolicy
);

sns.setSubscriptionAttributes(request);

Monitoring DLQs

You can use Amazon CloudWatch metrics and alarms to monitor the DLQs associated with your SNS subscriptions. In the car rental example, you can monitor the DLQs to be notified when the API failed to distribute any car rental order to the fulfillment or billing systems.

As regular SQS queues, the DLQs in SNS emit a number of metrics to CloudWatch, in 5-minute data points, such as NumberOfMessagesSent, NumberOfMessagesReceived and NumberOfMessagesDeleted. You can use these SQS metrics to be notified upon activity in your DLQs in SNS, so you may trigger a message recovery protocol.

You might have a case where you expect the DLQ to be always empty. In that case, create an CloudWatch alarm on NumberOfMessagesSent, set the alarm threshold to zero, and provide a separate SNS topic to be notified when the alarm goes off. The SNS topic, in its turn, can delivery your alarm notification to any endpoint type that you choose, such as email address, phone number, or mobile pager app.

Additionally, SNS itself provides its own set of metrics that are relevant to DLQs. Specifically, SNS metrics include the following:

  • NumberOfNotificationsRedrivenToDlq – Used when sending the message to the DLQ succeeds.
  • NumberOfNotificationsFailedToRedriveToDlq – Used when sending the message to the DLQ fails. This can happen because the DLQ either doesn’t exist anymore or doesn’t have the required access permissions to allow SNS to send messages to it. For more information about setting up the required access policy, see Giving Permissions for Amazon SNS to Send Messages to Amazon SQS.

Debugging with DLQs

Use CloudWatch Logs to see the exceptions that caused your SNS deliveries to fail and your messages to be sidelined to DLQs. In the car rental example, you can inspect the rental orders in the DLQs, as well as the logs associated with these queues. Then you can understand why those orders failed to be fanned out to the fulfilment or billing systems.

SNS can log both successful and failed deliveries in CloudWatch. You can enable Amazon SNS Delivery Status Logging by setting three SNS topic attributes, which are delivery protocol-specific. As an example, for SNS deliveries to SQS queues, you must set the following topic attributes: SQSSuccessFeedbackRoleArn,  SQSFailureFeedbackRoleArn, and SQSSuccessFeedbackSampleRate.

The following JSON object represents a successful SNS delivery in an CloudWatch Logs entry. The status code logged is 200 (SUCCESS). The attribute RedrivePolicy shows that the SNS subscription in question had its DLQ set.

{
  "notification": {
    "messageMD5Sum": "7bb3327ac55e49485bad42e159ca4d4b",
    "messageId": "e8c2bb09-235c-5f5d-b583-efd8df0f7d74",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:13:55.876"
  },
  "delivery": {
    "deliveryId": "6adf232e-fb12-5062-a564-27ff3741051f",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"sqsRequestId\":\"b2608a46-ccc4-51cc-003d-de972097debc\",\"sqsMessageId\":\"05fecd22-60a1-4d7d-bb79-026d49700b5a\"}",
    "dwellTimeMs": 58,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

The following JSON object represents a failed SNS delivery in CloudWatch Logs. In the following code example, the subscribed queue doesn’t exist. As a client error, the status code logged is 400 (FAILURE). Again, the RedrivePolicy attribute refers to a DLQ.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "9959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:51.116"
  },
  "delivery": {
    "deliveryId": "be743821-4c2c-5acc-a586-6cf0807f6fb1",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"ErrorCode\":\"AWS.SimpleQueueService.NonExistentQueue\", \"ErrorMessage\":\"The specified queue does not exist or you do not have access to it.\",\"sqsRequestId\":\"Unrecoverable\"}",
    "dwellTimeMs": 53,
    "attempts": 1,
    "statusCode": 400
  },
  "status": "FAILURE"
}

When the message delivery fails and there is a DLQ attached to the subscription, the message is sent to the DLQ and an additional entry is logged in CloudWatch. This new entry is specific to the delivery to the DLQ and refers to the DLQ ARN as the destination, as shown in the following JSON object.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "8959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:52.876"
  },
  "delivery": {
    "deliveryId": "a877c79f-a3ee-5105-9bbd-92596eae0232",
    "destination":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ",
    "providerResponse": "{\"sqsRequestId\":\"8cef1af5-e86a-519e-ad36-4f33252aa5ec\",\"sqsMessageId\":\"2b742c5c-0750-4ec5-a717-b95897adda8e\"}",
    "dwellTimeMs": 51,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

By analyzing Amazon CloudWatch Logs entries, you can understand why an SNS message was moved to a DLQ, and then take the required set of steps to recover the message. When you enable delivery status logging in SNS, you can configure the sample rate in which deliveries are logged, from 0% to 100%.

Encrypting DLQs

When your SNS subscription targets an SQS encrypted queue, then you probably want your DLQ to be an SQS encrypted queue as well. This configuration provides consistency in the form that your messages are encrypted at rest.

To follow this security recommendation, give the CMK you used to encrypt your DLQ a key policy that grants the SNS service principal access to AWS KMS API actions. For example, see the following sample key policy:

{
    "Sid": "GrantSnsAccessToKms",
    "Effect": "Allow",
    "Principal": { "Service": "sns.amazonaws.com" },
    "Action": [ "kms:Decrypt", "kms:GenerateDataKey*" ],
    "Resource": "*"
}

If you have an SNS encrypted topic, but a subscription in this topic points to a DLQ that isn’t an SQS encrypted queue, then messages sidelined to the DLQ aren’t encrypted at rest.

For more information, see Enabling Server-Side Encryption (SSE) for an Amazon SNS Topic with an Amazon SQS Encrypted Queue Subscribed.

Summary

DLQs for SNS, SQS, and Lambda increase the resiliency and durability of your applications. These DLQs address different failure modes, and can be used together.

  • SNS DLQs store messages that failed to be delivered to subscribed endpoints.
  • SQS DLQs store messages that the consumer system failed to process.
  • Lambda DLQs store the messages that resulted in failed asynchronous executions of your functions.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or CloudFormation. DLQs are available in all AWS Regions. Start today by running the tutorials:

Enriching Event-Driven Architectures with AWS Event Fork Pipelines

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/enriching-event-driven-architectures-with-aws-event-fork-pipelines/

This post is courtesy of Otavio Ferreira, Mgr, Amazon SNS, and James Hood, Sr. Software Dev Engineer

Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.

Customers often fork event processing into pipelines that address common event handling requirements, such as event storage, backup, search, analytics, or replay. To help you build event-driven applications even faster, AWS introduces Event Fork Pipelines, a collection of open-source event handling pipelines that you can subscribe to Amazon SNS topics in your AWS account.

Event Fork Pipelines is a suite of open-source nested applications, based on the AWS Serverless Application Model (AWS SAM). You can deploy it directly from the AWS Serverless Application Repository into your AWS account.

Event Fork Pipelines is built on top of serverless services, including Amazon SNS, Amazon SQS, and AWS Lambda. These services provide serverless building blocks that help you build fully managed, highly available, and scalable event-driven platforms. Lambda enables you to build event-driven microservices as serverless functions. SNS and SQS provide serverless topics and queues for integrating these microservices and other distributed systems in your architecture. These building blocks are at the core of the modern application development best practices.

Surfacing the event fork pattern

At AWS, we’ve worked closely with customers across market segments and geographies on event-driven architectures. For example:

  • Financial platforms that handle events related to bank transactions and stock ticks
  • Retail platforms that trigger checkout and fulfillment events

At scale, event-driven architectures often require a set of supporting services to address common requirements such as system auditability, data discoverability, compliance, business insights, and disaster recovery. Translated to AWS, customers often connect event-driven applications to services such as Amazon S3 for event storage and backup, and to Amazon Elasticsearch Service for event search and analytics. Also, customers often implement an event replay mechanism to recover from failure modes in their applications.

AWS created Event Fork Pipelines to encapsulate these common requirements, reducing the amount of effort required for you to connect your event-driven architectures to these supporting AWS services.

AWS then started sharing this pattern more broadly, so more customers could benefit. At the 2018 AWS re:Invent conference in Las Vegas, Amazon CTO Werner Vogels announced the launch of nested applications in his keynote. Werner shared the Event Fork Pipelines pattern with the audience as an example of common application logic that had been encapsulated as a set of nested applications.

The following reference architecture diagram shows an application supplemented by three nested applications:

Each pipeline is subscribed to the same SNS topic, and can process events in parallel as these events are published to the topic. Each pipeline is independent and can set its own subscription filter policy. That way, it processes only the subset of events that it’s interested in, rather than all events published to the topic.

Amazon SNS Fork pipelines reference architecture

Figure 1 – Reference architecture using Event Fork Pipelines

The three event fork pipelines are placed alongside your regular event processing pipelines, which are potentially already subscribed to your SNS topic. Therefore, you don’t have to change any portion of your current message publisher to take advantage of Event Fork Pipelines in your existing workloads. The following sections describe these pipelines and how to deploy them in your system architecture.

Understanding the catalog of event fork pipelines

In the abstract, Event Fork Pipelines is a serverless design pattern. Concretely, Event Fork Pipelines is also a suite of nested serverless applications, based on AWS SAM. You deploy the nested applications directly from the AWS Serverless Application Repository to your AWS account, to enrich your event-driven platforms. You can deploy them individually in your architecture, as needed.

Here’s more information about each nested application in the Event Fork Pipelines suite.

Event Storage & Backup pipeline

Event Fork Pipeline for Event Storage & Backup

Figure 2 – Event Fork Pipeline for Event Storage & Backup

The preceding diagram shows the Event Storage & Backup pipeline. You can subscribe this pipeline to your SNS topic to automatically back up the events flowing through your system. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that automatically polls for these events in the queue and pushes them into an Amazon Kinesis Data Firehose delivery stream
  • An S3 bucket that durably backs up the events loaded by the stream

You can configure this pipeline to fine-tune the behavior of your delivery stream. For example, you can configure your pipeline so that the underlying delivery stream buffers, transforms, and compresses your events before loading them into the bucket. As events are loaded, you can use Amazon Athena to query the bucket using standard SQL queries. Also, you can configure the pipeline to either reuse an existing S3 bucket or create a new one for you.

Event Search & Analytics pipeline

Event Fork Pipeline for Event Search & Analytics

Figure 3 – Event Fork Pipeline for Event Search & Analytics

The preceding diagram shows the Event Search & Analytics pipeline. You can subscribe this pipeline to your SNS topic to index in a search domain the events flowing through your system, and then run analytics on them. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and pushes them into a Data Firehose delivery stream
  • An Amazon ES domain that indexes the events loaded by the delivery stream
  • An S3 bucket that stores the dead-letter events that couldn’t be indexed in the search domain

You can configure this pipeline to fine-tune your delivery stream in terms of event buffering, transformation and compression. You can also decide whether the pipeline should reuse an existing Amazon ES domain in your AWS account or create a new one for you. As events are indexed in the search domain, you can use Kibana to run analytics on your events and update visual dashboards in real time.

Event Replay pipeline

Event Fork Pipeline for Event Replay

Figure 4 – Event Fork Pipeline for Event Replay

The preceding diagram shows the Event Replay pipeline. You can subscribe this pipeline to your SNS topic to record the events that have been processed by your system for up to 14 days. You can then reprocess them in case your platform is recovering from a failure or a disaster. This pipeline is composed of the following resources:

  • An SQS queue that buffers the events delivered by the SNS topic
  • A Lambda function that polls events from the queue and redrives them into your regular event processing pipeline, which is also subscribed to your topic

By default, the replay function is disabled, which means it isn’t redriving your events. If the events need to be reprocessed, your operators must enable the replay function.

Applying event fork pipelines in a use case

This is how everything comes together. The following scenario describes an event-driven, serverless ecommerce application that uses the Event Fork Pipelines pattern. This example ecommerce application is available in AWS Serverless Application Repository. You can deploy it to your AWS account using the Lambda console, test it, and look at its source code in GitHub.

Example ecommerce application using Event Fork Pipelines

Figure 5 – Example e-commerce application using Event Fork Pipelines

The ecommerce application takes orders from buyers through a RESTful API hosted by Amazon API Gateway and backed by a Lambda function named CheckoutFunction. This function publishes all orders received to an SNS topic named CheckoutEventsTopic, which in turn fans out the orders to four different pipelines. The first pipeline is the regular checkout-processing pipeline designed and implemented by you as the ecommerce application owner. This pipeline has the following resources:

  • An SQS queue named CheckoutQueue that buffers all orders received
  • A Lambda function named CheckoutFunction that polls the queue to process these orders
  • An Amazon DynamoDB table named CheckoutTable that securely saves all orders as they’re placed

The components of the system described thus far handle what you might think of as the core business logic. But in addition, you should address the set of elements necessary for making the system resilient, compliant, and searchable:

  • Backing up all orders securely. Compressed backups must be encrypted at rest, with sensitive payment details removed for security and compliance purposes.
  • Searching and running analytics on orders, if the amount is $100 or more. Analytics are needed for key ecommerce metrics, such as average ticket size, average shipping time, most popular products, and preferred payment options.
  • Replaying recent orders. If the fulfillment process is disrupted at any point, you should be able to replay the most recent orders from up to two weeks. This is a key requirement that guarantees the continuity of the ecommerce business.

Rather than implementing all the event processing logic yourself, you can choose to subscribe Event Fork Pipelines to your existing SNS topic CheckoutEventsTopic. The pipelines are configured as follows:

  • The Event Storage & Backup pipeline is configured to transform data as follows:
    • Remove credit card details
    • Buffer data for 60 seconds
    • Compress data using GZIP
    • Encrypt data using the default customer master key (CMK) for S3

This CMK is managed by AWS and powered by AWS Key Management Service (AWS KMS). For more information, see Choosing Amazon S3 for Your Destination, Data Transformation, and Configuration Settings in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Search & Analytics pipeline is configured with:
    • An index retry duration of 30 seconds
    • A bucket for storing orders that failed to be indexed in the search domain
    • A filter policy to restrict the set of orders that are indexed

For more information, see Choosing Amazon ES for Your Destination, in the Amazon Kinesis Data Firehose Developer Guide.

  • The Event Replay pipeline is configured with the SQS queue name that is part of the regular checkout processing pipeline. For more information, see Queue Name and URL in the Amazon SQS Developer Guide.

The filter policy, shown in JSON format, is set in the configuration for the Event Search & Analytics pipeline. This filter policy matches only incoming orders in which the total amount is $100 or more. For more information, see Message Filtering in the Amazon SNS Developer Guide.


{

    "amount": [

        { "numeric": [ ">=", 100 ] }

    ]

}

By using the Event Fork Pipelines pattern, you avoid the development overhead associated with coding undifferentiated logic for handling events.

Event Fork Pipelines can be deployed directly from AWS Serverless Application Repository into your AWS account.

Deploying event fork pipelines

Event Fork Pipelines is available as a set of public apps in the AWS Serverless Application Repository (to find the apps, select the ‘Show apps that create custom IAM roles or resource policies’ check box under the search bar). It can be deployed and tested manually via the Lambda console. In a production scenario, we recommend embedding fork pipelines within the AWS SAM template of your overall application. The nested applications feature enables you to do this by adding an AWS::Serverless::Application resource to your AWS SAM template. The resource references the ApplicationId and SemanticVersion values of the application to nest.

For example, you can include the Event Storage & Backup pipeline as a nested application by adding the following YAML snippet to the Resources section of your AWS SAM template:


Backup:

  Type: AWS::Serverless::Application

  Properties:

    Location:

      ApplicationId: arn:aws:serverlessrepo:us-east-1:012345678901:applications/fork-event-storage-backup-pipeline

      SemanticVersion: 1.0.0

    Parameters:

      # SNS topic ARN whose messages should be backed up to the S3 bucket.

      TopicArn: !Ref MySNSTopic

When specifying parameter values, you can use AWS CloudFormation intrinsic functions to reference other resources in your template. In the preceding example, the TopicArn parameter is filled in by referencing an AWS::SNS::Topic called MySNSTopic, defined elsewhere in the AWS SAM template. For more information, see Intrinsic Function Reference in the AWS CloudFormation User Guide.

To copy the YAML required for nesting, in the Lambda console page for an AWS Serverless Application Repository application, choose Copy as SAM Resource.

Authoring new event fork pipelines

We invite you to fork the Event Fork Pipelines repository in GitHub and submit pull requests for contributing with new pipelines. In addition to event storage and backup, event search and analytics, and event replay, what other common event handling requirements have you seen?

We look forward to seeing what you’ll come up with for extending the Event Fork Pipelines suite.

Summary

Event Fork Pipelines is a serverless design pattern and a suite of open-source nested serverless applications, based on AWS SAM. You can deploy it directly from AWS Serverless Application Repository to enrich your event-driven system architecture. Event Fork Pipelines lets you store, back up, replay, search, and run analytics on the events flowing through your system. There’s no need to write code, manually stitch resources together, or set up infrastructure.

You can deploy Event Fork Pipelines in any AWS Region that supports the underlying AWS services used in the pipelines. There are no additional costs associated with Event Fork Pipelines itself, and you pay only for using the AWS resources inside each nested application.

Get started today by deploying the example ecommerce application or searching for Event Fork Pipelines in AWS Serverless Application Repository.

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Migrating from RabbitMQ to Amazon MQ

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/migrating-from-rabbitmq-to-amazon-mq/

This post is courtesy of Sam Dengler, AWS Solutions Architect.

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Some AWS customers are using RabbitMQ today and would like to migrate to a managed service to reduce the overhead of operating their own message broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easier to operate and scale message brokers in the cloud. Amazon MQ provides compatibility with your existing workloads that use standard protocols such as OpenWire, AMQP, MQTT, and Stomp (all enabled with SSL). Amazon MQ automatically provisions infrastructure configured as a single-instance broker or as an active/standby broker for high availability.

In this post, I describe how to launch a new Amazon MQ instance. I review example Java code to migrate from a RabbitMQ to Amazon MQ message broker using clients for ActiveMQ, Apache Qpid JMS, and Spring JmsTemplates. I also review best practices for Amazon MQ and changes from RabbitMQ to Amazon MQ to support Publish/Subscribe message patterns.

Getting started with Amazon MQ

To start, open the Amazon MQ console. Enter a broker name and choose Next step.

Launch a new Amazon MQ instance, choosing the mq.t2.micro instance type and Single-instance broker deployment mode, creating a user name and password, and choosing Create broker.

After several minutes, your instance changes status from Creation in progress to Running.  You can visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your instance queues, etc. In the following code examples, you use the OpenWire and AMQP endpoints.

To be able to access your broker, you must configure one of your security groups to allow inbound traffic. For more information, see the link to Detailed instructions in the blue box in the Connections section.

Now that your Amazon MQ broker is running, let’s look at some code!

Dependencies

The following code examples have dependencies across a range of libraries in order to demonstrate RabbitMQ, ActiveMQ, Qpid, Spring JMS templates, and connection pooling. I’ve listed all the dependencies in a single Maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MyGroup</groupId>
    <artifactId>MyArtifact</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
    
        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- Apache Connection Pooling -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache QPid -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-client</artifactId>
            <version>6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.29.0</version>
        </dependency>
                
        <!-- Spring JmsTemplate -->
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.3.RELEASE</version>
        </dependency>
        
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>
</project>

RabbitMQ

Here’s an example using RabbitMQ to send and receive a message via queue. The installation and configuration of RabbitMQ is out of scope for this post. For instructions for downloading and installing RabbitMQ, see Downloading and Installing RabbitMQ.

RabbitMQ uses the AMQP 0-9-1 protocol by default, with support for AMQP 1.0 via a plugin. The RabbitMQ examples in this post use the AMQP 0-9

RabbitMQ queue example

To start, here’s some sample code to send and receive a message in RabbitMQ using a queue.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT;
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish("", QUEUE, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        Connection consumerConnection = connectionFactory.newConnection();

        // Create a channel for the consumer.
        Channel consumerChannel = consumerConnection.createChannel();

        // Create a queue named "MyQueue".
        consumerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Receive the message.
        GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE);
        String message = new String(response.getBody(), "UTF-8");
        System.out.println("Message received: " + message);

        // Clean up the consumer.
        consumerChannel.close();
        consumerConnection.close();
    }
}

In this example, you need to specify the ENDPOINT, USERNAME, and PASSWORD for your RabbitMQ message broker using environment variables or dependency injection.

This example uses the RabbitMQ client library to establish connectivity to the message broker and a channel for communication. In RabbitMQ, messages are sent over the channel to a named queue, which stores messages in a buffer, and from which consumers can receive and process messages. In this example, you publish a message using the Channel.basicPublish method, using the default exchange, identified by an empty string (“”).

To receive and process the messages in the queue, create a second connection, channel, and queue. Queue declaration is an idempotent operation, so there is no harm in declaring it twice. In this example, you receive the message using the Channel.basicGet method, automatically acknowledging message receipt to the broker.

This example demonstrates the basics of sending and receiving a message of one type. However, what if you wanted to publish messages of different types such that various consumers could subscribe only to pertinent message types (that is, pub/sub)?  Here’s a RabbitMQ example using topic exchanges to route messages to different queues.

RabbitMQ topic example

This example is similar to the one earlier. To enable topic publishing, specify two additional properties: EXCHANGE and ROUTING_KEY. RabbitMQ uses the exchange and routing key properties for routing messaging. Look at how these properties change the code to publish a message.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
    // be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "amqp://localhost:5672"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    private static final String EXCHANGE = "MyExchange";
    private static final String ROUTING_KEY = "MyRoutingKey";
    
    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create an exchange named "MyExchange".
        producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        // Bind "MyQueue" to "MyExchange", using routing key "MyRoutingKey".
        producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();
        
        ...


As before, you establish a connection to the RabbitMQ message broker, a channel for communication, and a queue to buffer messages for consumption. In addition to these components, you declare an explicit exchange of type BuiltinExchangeType.TOPIC and bind the queue to the exchange using the ROUTING_KEY that filters messages to send to the queue.

Again, publish a message using the Channel.basicPublish method. This time, instead of publishing the message to a queue, specify the EXCHANGE and ROUTING_KEY values for the message. RabbitMQ uses these properties to route the message to the appropriate queue, from which a consumer receives the message using the same code from the first example.

JMS API

Now that you’ve seen examples for queue and topic publishing in RabbitMQ, look at code changes to support Amazon MQ, starting with the ActiveMQ client. But first, a quick review of the Java Messaging Service (JMS) API.

The remainder of the examples in this post use the JMS API, which abstracts messaging methods from underlying protocol and client implementations. The JMS API programming model uses a combination of connection factories, connections, sessions, destinations, message producers, and message consumers to send and receive messages. The following image (from The Java EE 6 Tutorial) shows the relationship between these components:

ActiveMQ OpenWire connectivity to Amazon MQ

Here’s how JMS is used with ActiveMQ to send and receive messages on a queue.

The ActiveMQ client uses the OpenWire protocol, supported by Amazon MQ. The OpenWire protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

ActiveMQ queue example

Next, here’s an example to send and receive messages to Amazon MQ using the ActiveMQ client. This example should look familiar, as it follows the same flow to send and receive messages via a queue. I’ve included the example in full and then highlighted the differences to consider when migrating from RabbitMQ.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, you use the ActiveMQ client to establish connectivity to AmazonMQ using the OpenWire protocol with the ActiveMQConnectionFactory class to specify the endpoint and user credentials. For this example, use the master user name and password chosen when creating the Amazon MQ broker earlier. However, it’s a best practice to create additional Amazon MQ users for brokers in non-sandbox environments.

You could use the ActiveMQConnectionFactory to establish connectivity to the Amazon MQ broker. However, it is a best practice in Amazon MQ to group multiple producer requests using the ActiveMQ PooledConnectionFactory to wrap the ActiveMQConnectionFactory.

Using the PooledConnectionFactory, you can create a connection to Amazon MQ and establish a session to send a message. Like the RabbitMQ queue example, create a message queue destination using the Session.createQueue method, and a message producer to send the message to the queue.

For the consumer, use the ActiveMQConnectionFactory, NOT the PooledConnectionFactory, to create a connection, session, queue destination, and message consumer to receive the message because pooling of consumers is not considered a best practice. For more information, see the ActiveMQ Spring Support page.

ActiveMQ virtual destinations on Amazon MQ

Here’s how topic publishing differs from RabbitMQ to Amazon MQ.

If you remember from the RabbitMQ topic example, you bound a queue to an exchange using a routing key to control queue destinations when sending messages using a key attribute.

You run into a problem if you try to implement topic subscription using the message consumer in the preceding ActiveMQ queue example. The following is an excerpt from Virtual Destinations, which provides more detail on this subject:

A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. To be JMS-compliant, only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. That is, only one thread can be actively consuming from a given logical topic subscriber.

To solve this, ActiveMQ supports the concept of a virtual destination, which provides a logical topic subscription access to a physical queue for consumption without breaking JMS compliance. To do so, ActiveMQ uses a simple convention for specifying the topic and queue names to configure message routing.

  • Topic names must use the “VirtualTopic.” prefix, followed by the topic name. For example, VirtualTopic.MyTopic.
  • Consumer names must use the “Consumer.” prefix, followed by the consumer name, followed by the topic name. For example, Consumer.MyConsumer.VirtualTopic.MyTopic.

ActiveMQ topic example

Next, here’s an example for the ActiveMQ client that demonstrates publishing messaging to topics. This example is similar to the ActiveMQ Queue Example. In this one, create a Topic destination instead of a queue destination.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a topic named "VirtualTopic.MyTopic".
        Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);

        // Create a producer from the session to the topic.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue called "Consumer.Consumer1.VirtualTopic.MyTopic".
        Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, the message producer uses the Session.createTopic method with the topic name, VirtualTopic.MyTopic, as the publishing destination. The message consumer code does not change, but the queue destination uses the virtual destination convention, Consumer.Consumer1.VirtualTopic.MyTopic. ActiveMQ uses these names for the topic and queue to route messages accordingly.

AMQP connectivity to Amazon MQ

Now that you’ve explored some examples using an ActiveMQ client, look at examples using the Qpid JMS client to connect to the Amazon MQ broker over the AMQP 1.0 protocol and see how they differ.

The Qpid client uses the Advanced Message Queuing Protocol (AMQP) 1.0 protocol, supported by Amazon MQ. The AMQP 1.0 protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It uses port 5671, which must be opened in the Security Group associated with the Amazon MQ broker.

The AMQP endpoint specifies a transport, amqp+ssl. For encrypted connections, Qpid expects the protocol name to be amqps, instead of amqp+ssl, however the rest of the connection address remains the same.

Qpid JMS queue example

Next, here’s an example to send and receive messages to Amazon MQ using the Qpid client. The Qpid JMS client is built using Apache Qpid Proton, an AMQP messaging toolkit.

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class QpidClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException, NamingException {
        // Use JNDI to specify the AMQP endpoint
        Hashtable<Object, Object> env = new Hashtable<Object, Object>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put("connectionfactory.factoryLookup", ENDPOINT);
        javax.naming.Context context = new javax.naming.InitialContext(env);

        // Create a connection factory.
        ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Qpid Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

The Qpid queue example is similar to the ActiveMQ Queue Example. They both use the JMS API model to send and receive messages, but the difference is in how the ConnectionFactory and AMQP endpoint is specified. According to the Qpid client configuration documentation, the ConnectionFactory is specified using a JNDI InitialContext to look up JMS objects. The JNDI configuration is popularly specified in a file named jndi.properties on the Java Classpath. In this example, do it programmatically using a HashTable for simplicity.

NOTE: Although the Qpid client and Qpid JMS client are used to establish connectivity to Amazon MQ using the AMQP 1.0 protocol, the producer should still use the ActiveMQ PooledConnectionFactory to wrap the Qpid ConnectionFactory. This can be confusing because Qpid client provides a PooledConnectionFactory that should NOT be used for AMQP 1.0.

The Qpid topic example is identical to the earlier ActiveMQ topic example with the same substitution, which establishes the ConnectionFactory to the AMQP 1.0 endpoint via JNDI.

Spring JMS template queue example

Finally, here are examples using the Spring JmsTemplate to send and receive messages.

This example established connectivity to Amazon MQ using the same protocol and client library used in the ActiveMQ queue example. That example requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

The Spring JmsTemplate provides a higher-level abstraction on top of JMS. Code using the JmsTemplate class only needs to implement handlers to process messages, while the management of connections, sessions, message producers, and message consumers is delegated to Spring. Look at the following code:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
        
        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}

Although Spring manages connections, sessions, and message producers, the grouping of producer connections is still a best practice. The ActiveMQ PooledConnectionFactory class is used in this example. However, the Spring CachingConnectionFactory object is another option.

Following the PooledConnectionFactory creation, a JmsTemplate is created for the producer and an ActiveMQQueue is created as the message destination. To use JmsTemplate to send a message, a MessageCreator callback is defined that generates a text message via the JmsTemplate.

A second JmsTemplate with an ActiveMQQueue is created for the consumer. In this example, a single message is received synchronously, however, asynchronous message reception is a popular alternative when using message-driven POJOs.

Unlike the ActiveMQ examples, the Spring JMS template example does not require the explicit cleanup of the connection, session, message producer, or message consumer resources, as that is managed by Spring. Make sure to call the PooledConnectionFactory.stop method to cleanly exit the main method.

Finally, here’s an example using a Spring JmsTemplate for topic publishing.

Spring JmsTemplate topic example

This example combines the Spring JmsTemplate queue example with the virtual destinations approach from the ActiveMQ topic example. Look at the following code.

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);

        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
In this example, follow the ActiveMQ virtual destination naming convention for topics and queues:
  • When creating the producer JMS template, specify an ActiveMQTopic as the destination using the name VirtualTopic.MyTopic.
  • When creating the consumer JMS template, specify an ActiveMQQueue as the destination using the name Consumer.Consumer1.VirtualTopic.MyTopic.

ActiveMQ automatically handles routing messages from topic to queue.

Conclusion

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

If you’re thinking about integrating your existing apps with new serverless apps, see the related post, Invoking AWS Lambda from Amazon MQ.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year for new AWS accounts.

Message Filtering Operators for Numeric Matching, Prefix Matching, and Blacklisting in Amazon SNS

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/message-filtering-operators-for-numeric-matching-prefix-matching-and-blacklisting-in-amazon-sns/

This blog was contributed by Otavio Ferreira, Software Development Manager for Amazon SNS

Message filtering simplifies the overall pub/sub messaging architecture by offloading message filtering logic from subscribers, as well as message routing logic from publishers. The initial launch of message filtering provided a basic operator that was based on exact string comparison. For more information, see Simplify Your Pub/Sub Messaging with Amazon SNS Message Filtering.

Today, AWS is announcing an additional set of filtering operators that bring even more power and flexibility to your pub/sub messaging use cases.

Message filtering operators

Amazon SNS now supports both numeric and string matching. Specifically, string matching operators allow for exact, prefix, and “anything-but” comparisons, while numeric matching operators allow for exact and range comparisons, as outlined below. Numeric matching operators work for values between -10e9 and +10e9 inclusive, with five digits of accuracy right of the decimal point.

  • Exact matching on string values (Whitelisting): Subscription filter policy   {"sport": ["rugby"]} matches message attribute {"sport": "rugby"} only.
  • Anything-but matching on string values (Blacklisting): Subscription filter policy {"sport": [{"anything-but": "rugby"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"} and {"sport": "football"} but not {"sport": "rugby"}
  • Prefix matching on string values: Subscription filter policy {"sport": [{"prefix": "bas"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"}
  • Exact matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["=", 301.5]}]} matches message attributes {"balance": 301.500} and {"balance": 3.015e2}
  • Range matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["<", 0]}]} matches negative numbers only, and {"balance": [{"numeric": [">", 0, "<=", 150]}]} matches any positive number up to 150.

As usual, you may apply the “AND” logic by appending multiple keys in the subscription filter policy, and the “OR” logic by appending multiple values for the same key, as follows:

  • AND logic: Subscription filter policy {"sport": ["rugby"], "language": ["English"]} matches only messages that carry both attributes {"sport": "rugby"} and {"language": "English"}
  • OR logic: Subscription filter policy {"sport": ["rugby", "football"]} matches messages that carry either the attribute {"sport": "rugby"} or {"sport": "football"}

Message filtering operators in action

Here’s how this new set of filtering operators works. The following example is based on a pharmaceutical company that develops, produces, and markets a variety of prescription drugs, with research labs located in Asia Pacific and Europe. The company built an internal procurement system to manage the purchasing of lab supplies (for example, chemicals and utensils), office supplies (for example, paper, folders, and markers) and tech supplies (for example, laptops, monitors, and printers) from global suppliers.

This distributed system is composed of the four following subsystems:

  • A requisition system that presents the catalog of products from suppliers, and takes orders from buyers
  • An approval system for orders targeted to Asia Pacific labs
  • Another approval system for orders targeted to European labs
  • A fulfillment system that integrates with shipping partners

As shown in the following diagram, the company leverages AWS messaging services to integrate these distributed systems.

  • Firstly, an SNS topic named “Orders” was created to take all orders placed by buyers on the requisition system.
  • Secondly, two Amazon SQS queues, named “Lab-Orders-AP” and “Lab-Orders-EU” (for Asia Pacific and Europe respectively), were created to backlog orders that are up for review on the approval systems.
  • Lastly, an SQS queue named “Common-Orders” was created to backlog orders that aren’t related to lab supplies, which can already be picked up by shipping partners on the fulfillment system.

The company also uses AWS Lambda functions to automatically process lab supply orders that don’t require approval or which are invalid.

In this example, because different types of orders have been published to the SNS topic, the subscribing endpoints have had to set advanced filter policies on their SNS subscriptions, to have SNS automatically filter out orders they can’t deal with.

As depicted in the above diagram, the following five filter policies have been created:

  • The SNS subscription that points to the SQS queue “Lab-Orders-AP” sets a filter policy that matches lab supply orders, with a total value greater than $1,000, and that target Asia Pacific labs only. These more expensive transactions require an approver to review orders placed by buyers.
  • The SNS subscription that points to the SQS queue “Lab-Orders-EU” sets a filter policy that matches lab supply orders, also with a total value greater than $1,000, but that target European labs instead.
  • The SNS subscription that points to the Lambda function “Lab-Preapproved” sets a filter policy that only matches lab supply orders that aren’t as expensive, up to $1,000, regardless of their target lab location. These orders simply don’t require approval and can be automatically processed.
  • The SNS subscription that points to the Lambda function “Lab-Cancelled” sets a filter policy that only matches lab supply orders with total value of $0 (zero), regardless of their target lab location. These orders carry no actual items, obviously need neither approval nor fulfillment, and as such can be automatically canceled.
  • The SNS subscription that points to the SQS queue “Common-Orders” sets a filter policy that blacklists lab supply orders. Hence, this policy matches only office and tech supply orders, which have a more streamlined fulfillment process, and require no approval, regardless of price or target location.

After the company finished building this advanced pub/sub architecture, they were then able to launch their internal procurement system and allow buyers to begin placing orders. The diagram above shows six example orders published to the SNS topic. Each order contains message attributes that describe the order, and cause them to be filtered in a different manner, as follows:

  • Message #1 is a lab supply order, with a total value of $15,700 and targeting a research lab in Singapore. Because the value is greater than $1,000, and the location “Asia-Pacific-Southeast” matches the prefix “Asia-Pacific-“, this message matches the first SNS subscription and is delivered to SQS queue “Lab-Orders-AP”.
  • Message #2 is a lab supply order, with a total value of $1,833 and targeting a research lab in Ireland. Because the value is greater than $1,000, and the location “Europe-West” matches the prefix “Europe-“, this message matches the second SNS subscription and is delivered to SQS queue “Lab-Orders-EU”.
  • Message #3 is a lab supply order, with a total value of $415. Because the value is greater than $0 and less than $1,000, this message matches the third SNS subscription and is delivered to Lambda function “Lab-Preapproved”.
  • Message #4 is a lab supply order, but with a total value of $0. Therefore, it only matches the fourth SNS subscription, and is delivered to Lambda function “Lab-Cancelled”.
  • Messages #5 and #6 aren’t lab supply orders actually; one is an office supply order, and the other is a tech supply order. Therefore, they only match the fifth SNS subscription, and are both delivered to SQS queue “Common-Orders”.

Although each message only matched a single subscription, each was tested against the filter policy of every subscription in the topic. Hence, depending on which attributes are set on the incoming message, the message might actually match multiple subscriptions, and multiple deliveries will take place. Also, it is important to bear in mind that subscriptions with no filter policies catch every single message published to the topic, as a blank filter policy equates to a catch-all behavior.

Summary

Amazon SNS allows for both string and numeric filtering operators. As explained in this post, string operators allow for exact, prefix, and “anything-but” comparisons, while numeric operators allow for exact and range comparisons. These advanced filtering operators bring even more power and flexibility to your pub/sub messaging functionality and also allow you to simplify your architecture further by removing even more logic from your subscribers.

Message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). SNS filtering operators for numeric matching, prefix matching, and blacklisting are available now in all AWS Regions, for no extra charge.

To experiment with these new filtering operators yourself, and continue learning, try the 10-minute Tutorial Filter Messages Published to Topics. For more information, see Filtering Messages with Amazon SNS in the SNS documentation.

Invoking AWS Lambda from Amazon MQ

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/invoking-aws-lambda-from-amazon-mq/

Contributed by Josh Kahn, AWS Solutions Architect

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

In this post, I discuss one approach to invoking AWS Lambda from queues and topics managed by Amazon MQ brokers. This and other similar patterns can be useful in integrating legacy systems with serverless architectures. You could also integrate systems already migrated to the cloud that use common APIs such as JMS.

For example, imagine that you work for a company that produces training videos and which recently migrated its video management system to AWS. The on-premises system used to publish a message to an ActiveMQ broker when a video was ready for processing by an on-premises transcoder. However, on AWS, your company uses Amazon Elastic Transcoder. Instead of modifying the management system, Lambda polls the broker for new messages and starts a new Elastic Transcoder job. This approach avoids changes to the existing application while refactoring the workload to leverage cloud-native components.

This solution uses Amazon CloudWatch Events to trigger a Lambda function that polls the Amazon MQ broker for messages. Instead of starting an Elastic Transcoder job, the sample writes the received message to an Amazon DynamoDB table with a time stamp indicating the time received.

Getting started

To start, navigate to the Amazon MQ console. Next, launch a new Amazon MQ instance, selecting Single-instance Broker and supplying a broker name, user name, and password. Be sure to document the user name and password for later.

For the purposes of this sample, choose the default options in the Advanced settings section. Your new broker is deployed to the default VPC in the selected AWS Region with the default security group. For this post, you update the security group to allow access for your sample Lambda function. In a production scenario, I recommend deploying both the Lambda function and your Amazon MQ broker in your own VPC.

After several minutes, your instance changes status from “Creation Pending” to “Available.” You can then visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your broker, publish test messages, and so on. In this example, use the Stomp protocol to connect to your broker. Be sure to capture the broker host name, for example:

<BROKER_ID>.mq.us-east-1.amazonaws.com

You should also modify the Security Group for the broker by clicking on its Security Group ID. Click the Edit button and then click Add Rule to allow inbound traffic on port 8162 for your IP address.

Deploying and scheduling the Lambda function

To simplify the deployment of this example, I’ve provided an AWS Serverless Application Model (SAM) template that deploys the sample function and DynamoDB table, and schedules the function to be invoked every five minutes. Detailed instructions can be found with sample code on GitHub in the amazonmq-invoke-aws-lambda repository, with sample code. I discuss a few key aspects in this post.

First, SAM makes it easy to deploy and schedule invocation of our function:

SubscriberFunction:
	Type: AWS::Serverless::Function
	Properties:
		CodeUri: subscriber/
		Handler: index.handler
		Runtime: nodejs6.10
		Role: !GetAtt SubscriberFunctionRole.Arn
		Timeout: 15
		Environment:
			Variables:
				HOST: !Ref AmazonMQHost
				LOGIN: !Ref AmazonMQLogin
				PASSWORD: !Ref AmazonMQPassword
				QUEUE_NAME: !Ref AmazonMQQueueName
				WORKER_FUNCTIOn: !Ref WorkerFunction
		Events:
			Timer:
				Type: Schedule
				Properties:
					Schedule: rate(5 minutes)

WorkerFunction:
Type: AWS::Serverless::Function
	Properties:
		CodeUri: worker/
		Handler: index.handler
		Runtime: nodejs6.10
Role: !GetAtt WorkerFunctionRole.Arn
		Environment:
			Variables:
				TABLE_NAME: !Ref MessagesTable

In the code, you include the URI, user name, and password for your newly created Amazon MQ broker. These allow the function to poll the broker for new messages on the sample queue.

The sample Lambda function is written in Node.js, but clients exist for a number of programming languages.

stomp.connect(options, (error, client) => {
	if (error) { /* do something */ }

	let headers = {
		destination: ‘/queue/SAMPLE_QUEUE’,
		ack: ‘auto’
	}

	client.subscribe(headers, (error, message) => {
		if (error) { /* do something */ }

		message.readString(‘utf-8’, (error, body) => {
			if (error) { /* do something */ }

			let params = {
				FunctionName: MyWorkerFunction,
				Payload: JSON.stringify({
					message: body,
					timestamp: Date.now()
				})
			}

			let lambda = new AWS.Lambda()
			lambda.invoke(params, (error, data) => {
				if (error) { /* do something */ }
			})
		}
})
})

Sending a sample message

For the purpose of this example, use the Amazon MQ console to send a test message. Navigate to the details page for your broker.

About midway down the page, choose ActiveMQ Web Console. Next, choose Manage ActiveMQ Broker to launch the admin console. When you are prompted for a user name and password, use the credentials created earlier.

At the top of the page, choose Send. From here, you can send a sample message from the broker to subscribers. For this example, this is how you generate traffic to test the end-to-end system. Be sure to set the Destination value to “SAMPLE_QUEUE.” The message body can contain any text. Choose Send.

You now have a Lambda function polling for messages on the broker. To verify that your function is working, you can confirm in the DynamoDB console that the message was successfully received and processed by the sample Lambda function.

First, choose Tables on the left and select the table name “amazonmq-messages” in the middle section. With the table detail in view, choose Items. If the function was successful, you’ll find a new entry similar to the following:

If there is no message in DynamoDB, check again in a few minutes or review the CloudWatch Logs group for Lambda functions that contain debug messages.

Alternative approaches

Beyond the approach described here, you may consider other approaches as well. For example, you could use an intermediary system such as Apache Flume to pass messages from the broker to Lambda or deploy Apache Camel to trigger Lambda via a POST to API Gateway. There are trade-offs to each of these approaches. My goal in using CloudWatch Events was to introduce an easily repeatable pattern familiar to many Lambda developers.

Summary

I hope that you have found this example of how to integrate AWS Lambda with Amazon MQ useful. If you have expertise or legacy systems that leverage APIs such as JMS, you may find this useful as you incorporate serverless concepts in your enterprise architectures.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.

Building Loosely Coupled, Scalable, C# Applications with Amazon SQS and Amazon SNS

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/building-loosely-coupled-scalable-c-applications-with-amazon-sqs-and-amazon-sns/

 
Stephen Liedig, Solutions Architect

 

One of the many challenges professional software architects and developers face is how to make cloud-native applications scalable, fault-tolerant, and highly available.

Fundamental to your project success is understanding the importance of making systems highly cohesive and loosely coupled. That means considering the multi-dimensional facets of system coupling to support the distributed nature of the applications that you are building for the cloud.

By that, I mean addressing not only the application-level coupling (managing incoming and outgoing dependencies), but also considering the impacts of of platform, spatial, and temporal coupling of your systems. Platform coupling relates to the interoperability, or lack thereof, of heterogeneous systems components. Spatial coupling deals with managing components at a network topology level or protocol level. Temporal, or runtime coupling, refers to the ability of a component within your system to do any kind of meaningful work while it is performing a synchronous, blocking operation.

The AWS messaging services, Amazon SQS and Amazon SNS, help you deal with these forms of coupling by providing mechanisms for:

  • Reliable, durable, and fault-tolerant delivery of messages between application components
  • Logical decomposition of systems and increased autonomy of components
  • Creating unidirectional, non-blocking operations, temporarily decoupling system components at runtime
  • Decreasing the dependencies that components have on each other through standard communication and network channels

Following on the recent topic, Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox, in this post, I look at some of the ways you can introduce SQS and SNS into your architectures to decouple your components, and show how you can implement them using C#.

Walkthrough

To illustrate some of these concepts, consider a web application that processes customer orders. As good architects and developers, you have followed best practices and made your application scalable and highly available. Your solution included implementing load balancing, dynamic scaling across multiple Availability Zones, and persisting orders in a Multi-AZ Amazon RDS database instance, as in the following diagram.


In this example, the application is responsible for handling and persisting the order data, as well as dealing with increases in traffic for popular items.

One potential point of vulnerability in the order processing workflow is in saving the order in the database. The business expects that every order has been persisted into the database. However, any potential deadlock, race condition, or network issue could cause the persistence of the order to fail. Then, the order is lost with no recourse to restore the order.

With good logging capability, you may be able to identify when an error occurred and which customer’s order failed. This wouldn’t allow you to “restore” the transaction, and by that stage, your customer is no longer your customer.

As illustrated in the following diagram, introducing an SQS queue helps improve your ordering application. Using the queue isolates the processing logic into its own component and runs it in a separate process from the web application. This, in turn, allows the system to be more resilient to spikes in traffic, while allowing work to be performed only as fast as necessary in order to manage costs.


In addition, you now have a mechanism for persisting orders as messages (with the queue acting as a temporary database), and have moved the scope of your transaction with your database further down the stack. In the event of an application exception or transaction failure, this ensures that the order processing can be retired or redirected to the Amazon SQS Dead Letter Queue (DLQ), for re-processing at a later stage. (See the recent post, Using Amazon SQS Dead-Letter Queues to Control Message Failure, for more information on dead-letter queues.)

Scaling the order processing nodes

This change allows you now to scale the web application frontend independently from the processing nodes. The frontend application can continue to scale based on metrics such as CPU usage, or the number of requests hitting the load balancer. Processing nodes can scale based on the number of orders in the queue. Here is an example of scale-in and scale-out alarms that you would associate with the scaling policy.

Scale-out Alarm

aws cloudwatch put-metric-alarm --alarm-name AddCapacityToCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
--statistic Average --period 300 --threshold 3 --comparison-operator GreaterThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
--evaluation-periods 2 --alarm-actions <arn of the scale-out autoscaling policy>

Scale-in Alarm

aws cloudwatch put-metric-alarm --alarm-name RemoveCapacityFromCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
 --statistic Average --period 300 --threshold 1 --comparison-operator LessThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
 --evaluation-periods 2 --alarm-actions <arn of the scale-in autoscaling policy>

In the above example, use the ApproximateNumberOfMessagesVisible metric to discover the queue length and drive the scaling policy of the Auto Scaling group. Another useful metric is ApproximateAgeOfOldestMessage, when applications have time-sensitive messages and developers need to ensure that messages are processed within a specific time period.

Scaling the order processing implementation

On top of scaling at an infrastructure level using Auto Scaling, make sure to take advantage of the processing power of your Amazon EC2 instances by using as many of the available threads as possible. There are several ways to implement this. In this post, we build a Windows service that uses the BackgroundWorker class to process the messages from the queue.

Here’s a closer look at the implementation. In the first section of the consuming application, use a loop to continually poll the queue for new messages, and construct a ReceiveMessageRequest variable.

public static void PollQueue()
{
    while (_running)
    {
        Task<ReceiveMessageResponse> receiveMessageResponse;

        // Pull messages off the queue
        using (var sqs = new AmazonSQSClient())
        {
            const int maxMessages = 10;  // 1-10

            //Receiving a message
            var receiveMessageRequest = new ReceiveMessageRequest
            {
                // Get URL from Configuration
                QueueUrl = _queueUrl, 
                // The maximum number of messages to return. 
                // Fewer messages might be returned. 
                MaxNumberOfMessages = maxMessages, 
                // A list of attributes that need to be returned with message.
                AttributeNames = new List<string> { "All" },
                // Enable long polling. 
                // Time to wait for message to arrive on queue.
                WaitTimeSeconds = 5 
            };

            receiveMessageResponse = sqs.ReceiveMessageAsync(receiveMessageRequest);
        }

The WaitTimeSeconds property of the ReceiveMessageRequest specifies the duration (in seconds) that the call waits for a message to arrive in the queue before returning a response to the calling application. There are a few benefits to using long polling:

  • It reduces the number of empty responses by allowing SQS to wait until a message is available in the queue before sending a response.
  • It eliminates false empty responses by querying all (rather than a limited number) of the servers.
  • It returns messages as soon any message becomes available.

For more information, see Amazon SQS Long Polling.

After you have returned messages from the queue, you can start to process them by looping through each message in the response and invoking a new BackgroundWorker thread.

// Process messages
if (receiveMessageResponse.Result.Messages != null)
{
    foreach (var message in receiveMessageResponse.Result.Messages)
    {
        Console.WriteLine("Received SQS message, starting worker thread");

        // Create background worker to process message
        BackgroundWorker worker = new BackgroundWorker();
        worker.DoWork += (obj, e) => ProcessMessage(message);
        worker.RunWorkerAsync();
    }
}
else
{
    Console.WriteLine("No messages on queue");
}

The event handler, ProcessMessage, is where you implement business logic for processing orders. It is important to have a good understanding of how long a typical transaction takes so you can set a message VisibilityTimeout that is long enough to complete your operation. If order processing takes longer than the specified timeout period, the message becomes visible on the queue. Other nodes may pick it and process the same order twice, leading to unintended consequences.

Handling Duplicate Messages

In order to manage duplicate messages, seek to make your processing application idempotent. In mathematics, idempotent describes a function that produces the same result if it is applied to itself:

f(x) = f(f(x))

No matter how many times you process the same message, the end result is the same (definition from Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions, Hohpe and Wolf, 2004).

There are several strategies you could apply to achieve this:

  • Create messages that have inherent idempotent characteristics. That is, they are non-transactional in nature and are unique at a specified point in time. Rather than saying “place new order for Customer A,” which adds a duplicate order to the customer, use “place order <orderid> on <timestamp> for Customer A,” which creates a single order no matter how often it is persisted.
  • Deliver your messages via an Amazon SQS FIFO queue, which provides the benefits of message sequencing, but also mechanisms for content-based deduplication. You can deduplicate using the MessageDeduplicationId property on the SendMessage request or by enabling content-based deduplication on the queue, which generates a hash for MessageDeduplicationId, based on the content of the message, not the attributes.
var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = _queueUrl,
    MessageBody = JsonConvert.SerializeObject(order),
    MessageGroupId = Guid.NewGuid().ToString("N"),
    MessageDeduplicationId = Guid.NewGuid().ToString("N")
};
  • If using SQS FIFO queues is not an option, keep a message log of all messages attributes processed for a specified period of time, as an alternative to message deduplication on the receiving end. Verifying the existence of the message in the log before processing the message adds additional computational overhead to your processing. This can be minimized through low latency persistence solutions such as Amazon DynamoDB. Bear in mind that this solution is dependent on the successful, distributed transaction of the message and the message log.

Handling exceptions

Because of the distributed nature of SQS queues, it does not automatically delete the message. Therefore, you must explicitly delete the message from the queue after processing it, using the message ReceiptHandle property (see the following code example).

However, if at any stage you have an exception, avoid handling it as you normally would. The intention is to make sure that the message ends back on the queue, so that you can gracefully deal with intermittent failures. Instead, log the exception to capture diagnostic information, and swallow it.

By not explicitly deleting the message from the queue, you can take advantage of the VisibilityTimeout behavior described earlier. Gracefully handle the message processing failure and make the unprocessed message available to other nodes to process.

In the event that subsequent retries fail, SQS automatically moves the message to the configured DLQ after the configured number of receives has been reached. You can further investigate why the order process failed. Most importantly, the order has not been lost, and your customer is still your customer.

private static void ProcessMessage(Message message)
{
    using (var sqs = new AmazonSQSClient())
    {
        try
        {
            Console.WriteLine("Processing message id: {0}", message.MessageId);

            // Implement messaging processing here
            // Ensure no downstream resource contention (parallel processing)
            // <your order processing logic in here…>
            Console.WriteLine("{0} Thread {1}: {2}", DateTime.Now.ToString("s"), Thread.CurrentThread.ManagedThreadId, message.MessageId);
            
            // Delete the message off the queue. 
            // Receipt handle is the identifier you must provide 
            // when deleting the message.
            var deleteRequest = new DeleteMessageRequest(_queueName, message.ReceiptHandle);
            sqs.DeleteMessageAsync(deleteRequest);
            Console.WriteLine("Processed message id: {0}", message.MessageId);

        }
        catch (Exception ex)
        {
            // Do nothing.
            // Swallow exception, message will return to the queue when 
            // visibility timeout has been exceeded.
            Console.WriteLine("Could not process message due to error. Exception: {0}", ex.Message);
        }
    }
}

Using SQS to adapt to changing business requirements

One of the benefits of introducing a message queue is that you can accommodate new business requirements without dramatically affecting your application.

If, for example, the business decided that all orders placed over $5000 are to be handled as a priority, you could introduce a new “priority order” queue. The way the orders are processed does not change. The only significant change to the processing application is to ensure that messages from the “priority order” queue are processed before the “standard order” queue.

The following diagram shows how this logic could be isolated in an “order dispatcher,” whose only purpose is to route order messages to the appropriate queue based on whether the order exceeds $5000. Nothing on the web application or the processing nodes changes other than the target queue to which the order is sent. The rates at which orders are processed can be achieved by modifying the poll rates and scalability settings that I have already discussed.

Extending the design pattern with Amazon SNS

Amazon SNS supports reliable publish-subscribe (pub-sub) scenarios and push notifications to known endpoints across a wide variety of protocols. It eliminates the need to periodically check or poll for new information and updates. SNS supports:

  • Reliable storage of messages for immediate or delayed processing
  • Publish / subscribe – direct, broadcast, targeted “push” messaging
  • Multiple subscriber protocols
  • Amazon SQS, HTTP, HTTPS, email, SMS, mobile push, AWS Lambda

With these capabilities, you can provide parallel asynchronous processing of orders in the system and extend it to support any number of different business use cases without affecting the production environment. This is commonly referred to as a “fanout” scenario.

Rather than your web application pushing orders to a queue for processing, send a notification via SNS. The SNS messages are sent to a topic and then replicated and pushed to multiple SQS queues and Lambda functions for processing.

As the diagram above shows, you have the development team consuming “live” data as they work on the next version of the processing application, or potentially using the messages to troubleshoot issues in production.

Marketing is consuming all order information, via a Lambda function that has subscribed to the SNS topic, inserting the records into an Amazon Redshift warehouse for analysis.

All of this, of course, is happening without affecting your order processing application.

Summary

While I haven’t dived deep into the specifics of each service, I have discussed how these services can be applied at an architectural level to build loosely coupled systems that facilitate multiple business use cases. I’ve also shown you how to use infrastructure and application-level scaling techniques, so you can get the most out of your EC2 instances.

One of the many benefits of using these managed services is how quickly and easily you can implement powerful messaging capabilities in your systems, and lower the capital and operational costs of managing your own messaging middleware.

Using Amazon SQS and Amazon SNS together can provide you with a powerful mechanism for decoupling application components. This should be part of design considerations as you architect for the cloud.

For more information, see the Amazon SQS Developer Guide and Amazon SNS Developer Guide. You’ll find tutorials on all the concepts covered in this post, and more. To can get started using the AWS console or SDK of your choice visit:

Happy messaging!