Tag Archives: Amazon SQS

Improved failure recovery for Amazon EventBridge

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/improved-failure-recovery-for-amazon-eventbridge/

Today we’re announcing two new capabilities for Amazon EventBridgedead letter queues and custom retry policies. Both of these give you greater flexibility in how to handle any failures in the processing of events with EventBridge. You can easily enable them on a per target basis and configure them uniquely for each.

Dead letter queues (DLQs) are a common capability in queuing and messaging systems that allow you to handle failures in event or message receiving systems. They provide a way for failed events or messages to be captured and sent to another system, which can store them for future processing. With DLQs, you can have greater resiliency and improved recovery from any failure that happens.

You can also now configure a custom retry policy that can be set on your event bus targets. Today, there are two attributes that can control how events are retried: maximum number of retries and maximum event age. With these two settings, you could send events to a DLQ sooner and reduce the retries attempted.

For example, this could allow you to recover more quickly if an event bus target is overwhelmed by the number of events received, causing throttling to occur. The events are placed in a DLQ and then processed later.

Failures in event processing

Currently, EventBridge can fail to deliver an event to a target in certain scenarios. Events that fail to be delivered to a target due to client-side errors are dropped immediately. Examples of this are when EventBridge does not have permission to a target AWS service or if the target no longer exists. This can happen if the target resource is misconfigured or is deleted by the resource owner.

For service-side issues, EventBridge retries delivery of events for up to 24 hours. This can happen if the target service is unavailable or the target resource is not provisioned to handle the incoming event traffic and the target service is throttling the requests.

EventBridge failures

EventBridge failures

Previously, when all attempts to deliver an event to the target were exhausted, EventBridge published a CloudWatch metric indicating a failed target invocation. However, this provides no visibility into which events failed to be delivered and there was no way to recover the event that failed.

Dead letter queues

EventBridge’s DLQs are made possible today with Amazon Simple Queue Service (SQS) standard queues. With SQS, you get all of the benefits of a fully serverless queuing service: no servers to manage, automatic scalability, pay for what you consume, and high availability and security built in. You can configure the DLQs for your EventBridge bus and pay nothing until it is used, if and when a target experiences an issue. This makes it a great practice to follow and standardize on, and provides you with a safety net that’s active only when needed.

Optionally, you could later configure an AWS Lambda function to consume from that DLQ. The function is only invoked when messages exist in the queue, allowing you to maintain a serverless stack to recover from a potential failure.

DLQ configured per target

DLQ configured per target

With DLQ configured, the queue receives the event that failed in the message with important metadata that you can use to troubleshoot the issue. This can include: Error Code, Error Message, Exhausted Retry Condition, Retry Attempts, Rule ARN, and the Target ARN.

You can use this data to more easily troubleshoot what went wrong with the original delivery attempt and take action to resolve or prevent such failures in the future. You could also use the information such as Exhausted Retry Condition and Retry Attempts to further tweak your custom retry policy.

You can configure a DLQ when creating or updating rules via the AWS Management Console and AWS Command Line Interface (AWS CLI). You can also use infrastructure as code (IaC) tools such as AWS CloudFormation.

In the console, select the queue to be used for your DLQ configuration from the drop-down as shown here:

DLQ configuration

DLQ configuration

When configured via API, AWS CLI, or IaC tools, you must specify the ARN of the queue:

arn:aws:sqs:us-east-1:123456789012:orders-bus-shipping-service-dlq

When you configure a DLQ, the target SQS queue requires a resource-based policy that grants EventBridge access. One is created and applied automatically via the console when you create or update an EventBridge rule with a DLQ that exists in your own account.

For any queues created in other accounts, or via API, AWS CLI, or IaC tools, you must add a policy that allows SQS’s SendMessage permission to the EventBridge rule ARN, as shown below:

{
  "Sid": "Dead-letter queue permissions",
  "Effect": "Allow",
  "Principal": {
     "Service": "events.amazonaws.com"
  },
  "Action": "sqs:SendMessage",
  "Resource": "arn:aws:sqs:us-east-1:123456789012:orders-bus-shipping-service-dlq",
  "Condition": {
    "ArnEquals": {
      "aws:SourceArn": "arn:aws:events:us-east-1:123456789012:rule/MyTestRule"
    }
  }
}

You can read more about setting permissions for DLQ the documentation for “Granting permissions to the dead-letter queue”.

Once configured, you can monitor CloudWatch metrics for the DLQ queue. This shows both the successful delivery of messages via the InvocationsSentToDLQ metric, in addition to any failures via the InvocationsFailedToBeSentToDLQ. Note that these metrics do not exist if your queue is not considered “active”.

Retry policies

By default, EventBridge retries delivery of an event to a target so long as it does not receive a client-side error as described earlier. Retries occur with a back-off, for up to 185 attempts or for up to 24 hours, after which the event is dropped or sent to a DLQ, if configured. Due to the jitter of the back-off and retry process you may reach the 24-hour limit before reaching 185 retries.

For many workloads, this provides an acceptable way to handle momentary service issues or throttling that might occur. For some however, this model of back-off and retry can cause increased and on-going traffic to an already overloaded target system.

For example, consider an Amazon API Gateway target that has a resource constrained backend service behind it.

Constrained target service

Constrained target service

Under a consistently high load, the bus could end up generating too many API requests, tripping the API Gateway’s throttling configuration. This would cause API Gateway to respond with throttling errors back to EventBridge.

Throttled API reply

Throttled API reply

You may decide that allowing the failed events to retry for 24 hours puts too much load into this system and it may not properly recover from the load. This could lead to potential data loss unless a DLQ was configured.

Added DLQ

Added DLQ

With a DLQ, you could choose to process these events later, once the overwhelmed target service has recovered.

DLQ drained back to API

DLQ drained back to API

Or the events in question may no longer have the same value as they did previously. This can occur in systems where data loss is tolerated but the timeliness of data processing matters. In these situations, the DLQ would have less value and dropping the message is acceptable.

For either of these situations, configuring the maximum number of retries or the maximum age of the event could be useful.

Now with retry policies, you can configure per target the following two attributes:

  • MaximumEventAgeInSeconds: between 60 and 86400 seconds (86400, or 24 hours the default)
  • MaximumRetryAttempts: between 0 and 185 (185 is the default)

When either condition is met, the event fails. It’s then either dropped, which generates an increase to the FailedInvocations CloudWatch metric, or sent to a configured DLQ.

You can configure retry policy attributes when creating or updating rules via the AWS Management Console and AWS Command Line Interface (AWS CLI). You can also use infrastructure as code (IaC) tools such as AWS CloudFormation.

Retry policy

Retry policy

There is no additional cost for configuring either of these new capabilities. You only pay for the usage of the SQS standard queue configured as the dead letter queue during a failure and any application that handles the failed events. SQS pricing can be found here.

Conclusion

With dead letter queues and custom retry policies, you have improved handling and control over failure in distributed systems built with EventBridge. With DLQs you can capture failed events and then process them later, potentially saving yourself from data loss. With custom retry policies, you gain the improved ability to control the number of retries and for how long they can be retried.

I encourage you to explore how both of these new capabilities can help make your applications more resilient to failures, and to standardize on using them both in your infrastructure.

For more serverless learning resources, visit https://serverlessland.com.

Building storage-first serverless applications with HTTP APIs service integrations

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/building-storage-first-applications-with-http-apis-service-integrations/

Over the last year, I have been talking about “storage first” serverless patterns. With these patterns, data is stored persistently before any business logic is applied. The advantage of this pattern is increased application resiliency. By persisting the data before processing, the original data is still available, if or when errors occur.

Common pattern for serverless API backend

Common pattern for serverless API backend

Using Amazon API Gateway as a proxy to an AWS Lambda function is a common pattern in serverless applications. The Lambda function handles the business logic and communicates with other AWS or third-party services to route, modify, or store the processed data. One option is to place the data in an Amazon Simple Queue Service (SQS) queue for processing downstream. In this pattern, the developer is responsible for handling errors and retry logic within the Lambda function code.

The storage first pattern flips this around. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is run. By directly integrating API Gateway to SQS, developers can increase application reliability while reducing lines of code.

Storage first pattern for serverless API backend

Storage first pattern for serverless API backend

Previously, direct integrations require REST APIs with transformation templates written in Velocity Template Language (VTL). However, developers tell us they would like to integrate directly with services in a simpler way without using VTL. As a result, HTTP APIs now offers the ability to directly integrate with five AWS services without needing a transformation template or code layer.

The first five service integrations

This release of HTTP APIs direct integrations includes Amazon EventBridge, Amazon Kinesis Data Streams, Simple Queue Service (SQS), AWS System Manager’s AppConfig, and AWS Step Functions. With these new integrations, customers can create APIs and webhooks for their business logic hosted in these AWS services. They can also take advantage of HTTP APIs features like authorizers, throttling, and enhanced observability for securing and monitoring these applications.

Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

The HTTP APIs direct integration for EventBridge uses the PutEvents API to enable client applications to place events on an EventBridge bus. Once the events are on the bus, EventBridge routes the event to specific targets based upon EventBridge filtering rules.

This integration is a storage first pattern because data is written to the bus before any routing or logic is applied. If the downstream target service has issues, then EventBridge implements a retry strategy with incremental back-off for up to 24 hours. Additionally, the integration helps developers reduce code by filtering events at the bus. It routes to downstream targets without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Different tasks are required based upon incoming event details
  • Only data ingestion is required
  • Payload size is less than 256 kb
  • Expected requests per second are less than the Region quotas.

Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

The HTTP APIs direct integration for Kinesis Data Streams offers the PutRecord integration action, enabling client applications to place events on a Kinesis data stream. Kinesis Data Streams are designed to handle up to 1,000 writes per second per shard, with payloads up to 1 mb in size. Developers can increase throughput by increasing the number of shards in the data stream. You can route the incoming data to targets like an Amazon S3 bucket as part of a data lake or a Kinesis data analytics application for real-time analytics.

This integration is a storage first option because data is stored on the stream for up to seven days until it is processed and routed elsewhere. When processing stream events with a Lambda function, errors are handled at the Lambda layer through a configurable error handling strategy.

Use this direct integration when:

  • Ingesting large amounts of data
  • Ingesting large payload sizes
  • Order is important
  • Routing the same data to multiple targets

Amazon SQS

HTTP APIs service integration with Amazon SQS

HTTP APIs service integration with Amazon SQS

The HTTP APIs direct integration for Amazon SQS offers the SendMessage, ReceiveMessage, DeleteMessage, and PurgeQueue integration actions. This integration differs from the EventBridge and Kinesis integrations in that data flows both ways. Events can be created, read, and deleted from the SQS queue via REST calls through the HTTP API endpoint. Additionally, a full purge of the queue can be managed using the PurgeQueue action.

This pattern is a storage first pattern because the data remains on the queue for four days by default (configurable to 14 days), unless it is processed and removed. When the Lambda service polls the queue, the messages that are returned are hidden in the queue for a set amount of time. Once the calling service has processed these messages, it uses the DeleteMessage API to remove the messages permanently.

When triggering a Lambda function with an SQS queue, the Lambda service manages this process internally. However, HTTP APIs direct integration with SQS enables developers to move this process to client applications without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Data must be received as well as sent to the service
  • Downstream services need reduced concurrency
  • The queue requires custom management
  • Order is important (FIFO queues)

AWS AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

The HTTP APIs direct integration for AWS AppConfig offers the GetConfiguration integration action and allows applications to check for application configuration updates. By exposing the systems parameter API through an HTTP APIs endpoint, developers can automate configuration changes for their applications. While this integration is not considered a storage first integration, it does enable direct communication from external services to AppConfig without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Access to AWS AppConfig is required.
  • Managing application configurations.

AWS Step Functions

HTTP APIs service integration with AWS Step Functions

HTTP APIs service integration with AWS Step Functions

The HTTP APIs direct integration for Step Functions offers the StartExecution and StopExecution integration actions. These actions allow for programmatic control of a Step Functions state machine via an API. When starting a Step Functions workflow, JSON data is passed in the request and mapped to the state machine. Error messages are also mapped to the state machine when stopping the execution.

This pattern provides a storage first integration because Step Functions maintains a persistent state during the life of the orchestrated workflow. Step Functions also supports service integrations that allow the workflows to send and receive data without needing a Lambda function as a transport layer.

Use this direct integration when:

  • Orchestrating multiple actions.
  • Order of action is required.

Building HTTP APIs direct integrations

HTTP APIs service integrations can be built using the AWS CLI, AWS SAM, or through the API Gateway console. The console walks through contextual choices to help you understand what is required for each integration. Each of the integrations also includes an Advanced section to provide additional information for the integration.

Creating an HTTP APIs service integration

Creating an HTTP APIs service integration

Once you build an integration, you can export it as an OpenAPI template that can be used with infrastructure as code (IaC) tools like AWS SAM. The exported template can also include the API Gateway extensions that define the specific integration information.

Exporting the HTTP APIs configuration to OpenAPI

Exporting the HTTP APIs configuration to OpenAPI

OpenAPI template

An example of a direct integration from HTTP APIs to SQS is located in the Sessions With SAM repository. This example includes the following architecture:

AWS SAM template resource architecture

AWS SAM template resource architecture

The AWS SAM template creates the HTTP APIs, SQS queue, Lambda function, and both Identity and Access Management (IAM) roles required. This is all generated in 58 lines of code and looks like this:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: HTTP API direct integrations

Resources:
  MyQueue:
    Type: AWS::SQS::Queue
    
  MyHttpApi:
    Type: AWS::Serverless::HttpApi
    Properties:
      DefinitionBody:
        'Fn::Transform':
          Name: 'AWS::Include'
          Parameters:
            Location: './api.yaml'
          
  MyHttpApiRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: "apigateway.amazonaws.com"
            Action: 
              - "sts:AssumeRole"
      Policies:
        - PolicyName: ApiDirectWriteToSQS
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              Action:
              - sqs:SendMessage
              Effect: Allow
              Resource:
                - !GetAtt MyQueue.Arn
                
  MyTriggeredLambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt MyQueue.QueueName
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt MyQueue.Arn

Outputs:
  ApiEndpoint:
    Description: "HTTP API endpoint URL"
    Value: !Sub "https://${MyHttpApi}.execute-api.${AWS::Region}.amazonaws.com"

The OpenAPI template handles the route definitions for the HTTP API configuration and configures the service integration. The template looks like this:

openapi: "3.0.1"
info:
  title: "my-sqs-api"
paths:
  /:
    post:
      responses:
        default:
          description: "Default response for POST /"
      x-amazon-apigateway-integration:
        integrationSubtype: "SQS-SendMessage"
        credentials:
          Fn::GetAtt: [MyHttpApiRole, Arn]
        requestParameters:
          MessageBody: "$request.body.MessageBody"
          QueueUrl:
            Ref: MyQueue
        payloadFormatVersion: "1.0"
        type: "aws_proxy”
        connectionType: "INTERNET"
x-amazon-apigateway-importexport-version: "1.0"

Because the OpenAPI template is included in the AWS SAM template via a transform, the API Gateway integration can reference the roles and services created within the AWS SAM template.

Conclusion

This post covers the concept of storage first integration patterns and how the new HTTP APIs direct integrations can help. I cover the five current integrations and possible use cases for each. Additionally, I demonstrate how to use AWS SAM to build and manage the integrated applications using infrastructure as code.

Using the storage first pattern with direct integrations can help developers build serverless applications that are more durable with fewer lines of code. A Lambda function is no longer required to transport data from the API endpoint to the desired service. Instead, use Lambda function invocations for differentiating business logic.

To learn more join us for the HTTP API service integrations session of Sessions With SAM! 

#ServerlessForEveryone

ICYMI: Season one of Sessions with SAM

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-season-one-of-sessions-with-sam/

Developers tell us they want to know how to easily build and manage their serverless applications. In 2017 AWS announced AWS Serverless Application Model (SAM) to help with just that. To help developers learn more about SAM, I created a weekly Twitch series called Sessions with SAM. Each session focuses on a specific serverless task or service. It demonstrates deploying and managing that task using infrastructure as code (IaC) with SAM templates. This post recaps each session of the first season to prepare you for Sessions with SAM season two, starting August 13.

Sessions with SAM

Sessions with SAM

What is SAM

AWS SAM is an open source framework designed for building serverless applications. The framework provides shorthand syntax to quickly declare AWS Lambda functions, Amazon DynamoDB tables and more. Additionally, SAM is not limited to serverless resources and can also declare any standard AWS CloudFormation resource. With around 20 lines of code, a developer can create an application with an API, logic, and database layer with the proper permissions in place.

Example of using SAM templates to generate infrastructure

20 Lines of code

By using infrastructure as code to manage and deploy serverless applications, developers gain several advantages. You can version the templates and rollback when necessary. They can be parameterized for flexibility across multiple environments. They can be shared with development teams for consistency across developer environments.

Sessions

The code and linked videos are listed with the session. See the YouTube playlist and GitHub repository for the entire season.

Session one: JWT authorizers on Amazon API Gateway

In this session, I cover building an application backend using JWT authorizers with the new Amazon API Gateway HTTP API. We also discussed building an application with multiple routes and the ability to change the authorization requirements per route.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/http-api

Video: https://youtu.be/klOScYEojzY

Session two: Amazon Cognito authentication

In this session, I cover building an Amazon Cognito template for authentication. This includes the user management component with user pools and user groups in addition to a hosted authentication workflow with an app client.

Building an Amazon Cognito authentication provider

Building an Amazon Cognito authentication provider

We also discussed using custom pre-token Lambda functions to modify the JWT token issued by Amazon Cognito. This custom token allows you to insert custom scopes based on the Amazon Cognito user groups. These custom scopes are then used to customize the authorization requirements for the individual routes.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/cognito

Video: https://youtu.be/nBtWCjKd72M

Session three: Building a translation app with Amazon EventBridge

I covered using AWS SAM to build a basic translation and sentiment app centered around Amazon EventBridge. The SAM template created three Lambda functions, a custom EventBridge bus, and an HTTP API endpoint.

Architecture for serverless translation application

Architecture for serverless translation application

Requests from HTTP API endpoint are put into the custom EventBridge bus via the endpoint Lambda function. Based on the type of request, either the translate function or the sentiment function is invoked. The AWS SAM template manages all the infrastructure in addition to the permissions to invoke the Lambda functions and access Amazon Translate and Amazon Comprehend.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/eventbridge

Video: https://youtu.be/73R02KufLac

Session four: Building an Amazon Kinesis Data Firehose for ingesting website access logs

In this session, I covered building an Amazon Kinesis Data Firehose for ingesting large amounts of data. This particular application is designed for access logs generated from API Gateway. The logs are first stored to an Amazon DynamoDB data base for immediate processing. Next, the logs are sent through a Kinesis Data Firehose and stored in an Amazon S3 bucket for later processing.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/kinesis-firehose

Video: https://youtu.be/jdTBtaxs0hA

Session five: Analyzing API Gateway logs with Amazon Kinesis Data Analytics

Continuing from session 4, I discussed configuring API Gateway access logs to use the Kinesis Data Firehose built in the previous session. I also demonstrate an Amazon Kinesis data analytics application for near-real-time analytics of your access logs.

Example of Kinesis Data Analytics in SAM

Example of Kinesis Data Analytics in SAM

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/kinesis-firehose

Video: https://youtu.be/ce0v-q9EVTQ

Session six: Managing Amazon SQS with AWS SAM templates

I demonstrated configuring an Amazon Simple Queue Service (SQS) queue and the queue policy to control access to the queue. We also discuss allowing cross-account and external resources to access the queue. I show how to identify the proper principal resources for building the proper AWS IAM policy templates.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/SQS

Video: https://youtu.be/q2rbHMyJBDY

Session seven: Creating canary deploys for Lambda functions

In this session, I cover canary and linear deployments for serverless applications. We discuss how canary releases compare to linear releases and how they can be customized. We also spend time discussing pre-traffic and post-traffic tests and how rollbacks are handled when one of these tests fails.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/safe-deploy

Video: https://youtu.be/RE4r_6edaXc

Session eight: Configuring custom domains for Amazon API Gateway endpoints

In session eight I configured custom domains for API Gateway REST and HTTP APIs. The demonstration included the option to pass in an Amazon Route 53 zone ID or AWS Certificate Manager (ACM) certificate ARN. If either of these are missing, then the template built a zone or SSL cert respectively.

Working with Amazon Route 53 zones

Working with Amazon Route 53 zones

We discussed how to use declarative and imperative methods in our templates. We also discussed how to use a single domain across multiple APIs, regardless of they are REST or HTTP APIs.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/custom-domains

Video: https://youtu.be/4uXEGNKU5NI

Session nine: Managing AWS Step Functions with AWS SAM

In this session I was joined by fellow Senior Developer Advocate, Rob Sutter. Rob and I demonstrated managing and deploying AWS Step Functions using the new Step Functions support built into SAM. We discussed how SAM offers definition substitutions to pass data from the template into the state machine configuration.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/step-functions

Video: https://youtu.be/BguUgdZwymQ

Session ten: Using Amazon EFS with Lambda functions in SAM

Joined by Senior Developer Advocate, James Beswick, we covered configuring Amazon Elastic File System (EFS) as a storage option for Lambda functions using AWS SAM. We discussed the Amazon VPC requirements in configuring for EFS. James also walked through using the AWS Command Line Interface (CLI) to aid in configuration of the VPC.

Code: https://github.com/aws-samples/aws-lambda-efs-samples

Video: https://youtu.be/up1op216trk

Session eleven: Ask the experts

This session introduced you to some of our SAM experts. Jeff Griffiths, Senior Product Manager, and Alex Woods, Software Development Engineer, joined me in answering live audience questions. WE discussed best practices for local development and debugging, Docker networking, CORS configurations, roadmap features and more.

SAM experts panel

SAM experts panel

Video: https://youtu.be/2JRa8MugPCY

Session twelve: Managing .Net Lambda function in AWS SAM and Stackery

In this final session of the season, I was joined by Stackery CTO and serverless hero, Chase Douglas. Chase demonstrated using Stackery and AWS SAM to build and deploy .Net Core Lambda functions. We discuss how Stackery’s editor allows developers to visually design a serverless application and how it uses SAM templates under the hood.

Stackery visual editor

Stackery visual editor

Code only examples

In addition to code examples with each video session, the repo includes developer-requested code examples. In this section, I demonstrate how to build an access log pipeline for HTTP API or use the SAM build command to compile Swift for Lambda functions.

Conclusion

Sessions with SAM helps developers bootstrap their serverless applications with instructional video and ready-made IaC templates. From JWT authorizers to EFS storage solutions, over 15 AWS services are represented in SAM templates. The first season of live videos supplements these templates with best practices explained and real developer questions answered.

Season two of Sessions with SAM starts August 13. The series will continue the pattern of explaining best practices, providing usable starter templates, and having some fun along the way.

#ServerlessForEveryone

 

Application Integration Using Queues and Messages

Post Syndicated from Mithun Mallick original https://aws.amazon.com/blogs/architecture/application-integration-using-queues-and-messages/

In previous blog posts in this messaging series, we provided an overview of messaging and we also explained the common characteristics to consider when  evaluating messaging channel technologies. In this post, we will explain some of the semantics of queue-based processing, its use in designing flexible systems, and how to apply it to your use cases. AWS offers two queue-based services: Amazon Simple Queue Service (SQS) and Amazon MQ. We will focus on SQS in this blog.

Building Blocks

In the digital world, even the most basic design of web based systems requires the use of queues to integrate applications. SQS is a secure, serverless, durable, and highly available message queue service. It provides a simple REST API to create a queue as well as send, receive, and delete messages.

Producing Messages

Message producers are processes that call the SendMessage APIs of SQS. SQS supports two types of queues: standard and first-in-first-out (FIFO). Standard queues provide best effort ordering while FIFO provides first-in-first-out ordering. Messages can be sent either as single messages or in batches. Standard queues can support unlimited throughput by adding as many concurrent producers as needed, whereas FIFO queues can support up to 300 TPS without batching and 3000 TPS with batching.

app integration- producing messages

In terms of message delivery, SQS standard supports “at-least-once” delivery, meaning that messages will be delivered at least once but occasionally, more than one copy of the message will be delivered. SQS FIFO provides “exactly once” delivery, meaning it can detect duplicate messages.

Consuming Messages

Message consumers are processes that make the ReceiveMessage API call on SQS. Messages from queues can be processed either in batch or as a single message at a time. Each approach has its advantages and disadvantages.

  • Batch processing: This is where each message can be processed independently, and an error on a single message is not required to disrupt the entire batch. Batch processing provides the most throughput for processing, and also provides the most optimizations for resources involved in reading messages.
  • Single message processing: Single message processing is commonly used in scenarios where each message may trigger multiple processes within the consumer. In case of errors, the retry is confined to the single message.

To maintain the order of processing, FIFO queues are typically consumed by a single process. Messages across message groups can still be processed in parallel. However, the overall throughput limit will still apply for FIFO queues.

SQS supports long and short polling on queues. Short polling will sample a subset of servers and will return the messages or provide an empty response if there are no messages in those servers. Long polling will query all servers and return immediately if there are messages. Long polling can reduce cost by reducing the number of calls in cases of empty responses. Get more details and sample code for sending and receiving messages.

It’s simple to poll for messages but it does have an overhead of running a process continuously. In some situations it may be hard to monitor such processes and troubleshoot in case of errors. SQS integration with AWS Lambda takes the undifferentiated heavy lifting of polling logic into a Lambda agent.

app integration- consuming messages

Get more details for SQS as an event source for Lambda in this blog post: AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. This is supported for both standard as well as FIFO queues.

Benefits

Queue-based processing can protect backend systems like relational databases from unexpected surge in front-end traffic. You can decouple the processing logic by using an intermediate queue.

Consider a scenario that involves a web application to order products related to a TV show. It’s possible that the ordering and payment systems rely on a traditional relational database. During peak show seasons, queues can be used to control the traffic to the ordering and payment system. In the following diagram we show how the web application requests are staged in the queue, while the consumption of the users on the backend is based on the capacity of the database.

app integration - benefits

Error Handling

Asynchronous message processing presents unique challenges with error handling since they just manifest as log entries on the producer or consumer and create backlogs in processing. In order to handle errors elegantly, the first requirement is to address the nature of the error. In case the error is transient, it may help to retry after a brief delay and eventually move to a dead letter queue if repeated attempts fail. The other error scenario can be cases where the data itself is bad and the error won’t fix, even after repeated attempts. In such cases, consumer process needs to make this determination and needs to move the message to an error queue.

In some cases, due to its highly distributed architecture, Standard SQS can deliver a duplicate message, which it can result in duplicate key errors for the consumer. The best way to mitigate it is to make the consumers idempotent so that processing the same message multiple times produces the same result.

Conclusion

In this blog, we explained the importance of messaging in building distributed applications, various aspects of queue based processing using SQS like sending and receiving messages, FIFO versus Standard SQS, and common error handling scenarios. We also covered using SQS as an event source for AWS Lambda. Please refer to following blog posts for using messaging in different integration patterns:

ICYMI: Serverless pre:Invent 2019

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-serverless-preinvent-2019/

With Contributions from Chris Munns – Sr Manager – Developer Advocacy – AWS Serverless

The last two weeks have been a frenzy of AWS service and feature launches, building up to AWS re:Invent 2019. As there has been a lot announced we thought we’d ship an ICYMI post summarizing the serverless service specific features that have been announced. We’ve also dropped in some service announcements from services that are commonly used in serverless application architectures or development.

AWS re:Invent

AWS re:Invent 2019

We also want you to know that we’ll be talking about many of these features (as well as those coming) in sessions at re:Invent.

Here’s what’s new!

AWS Lambda

On September 3, AWS Lambda started rolling out a major improvement to how AWS Lambda functions work with your Amazon VPC networks. This change brings both scale and performance improvements, and addresses several of the limitations of the previous networking model with VPCs.

On November 25, Lambda announced that the rollout of this new capability has completed in 6 additional regions including US East (Virginia) and US West (Oregon).

New VPC to VPC NAT for Lambda functions

New VPC to VPC NAT for Lambda functions

On November 18, Lambda announced three new runtime updates. Lambda now supports Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new language features and benefits so be sure to check out the linked release posts. These new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda has released a number of controls for both stream and async based invocations:

  • For Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams, it’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities will help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

In addition to the above controls, Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set one destination for a success, and another for a failure. This unlocks really useful patterns for distributed event-based applications and can reduce code to send function results to a destination manually.

Lambda Destinations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Amazon Kinesis Data Streams and Amazon DynamoDB Streams. This allows for faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Lambda Parallelization Factor diagram

Lambda Parallelization Factor diagram

Lambda now supports Amazon SQS FIFO queues as an event source. FIFO queues guarantee the order of record processing compared to standard queues which are unordered. FIFO queues support messaging batching via a MessageGroupID attribute which allows for parallel Lambda consumers of a single FIFO queue. This allows for high throughput of record processing by Lambda.

Lambda now supports Environment Variables in AWS China (Beijing) Region, operated by Sinnet and the AWS China (Ningxia) Region, operated by NWCD.

Lastly, you can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics tell you the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, spot outliers, and find hard-to-spot situations that create a poor customer experience for a subset of your users.

AWS SAM CLI

AWS SAM CLI deploy command

AWS SAM CLI deploy command

The SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One of the powerful features of Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. Step Functions has expanded its integration with Amazon SageMaker to simplify machine learning workflows, and added a new integration with Amazon EMR, making it faster to build and easier to monitor EMR big data processing workflows.

Step Functions step with EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor and react to usage and spending trends on your AWS accounts.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

AWS Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates.

In US East (N. Virginia), US West (Oregon), and EU (Ireland), throughput has increased from 1,000 state transitions per second to 1,500 state transitions per second with bucket capacity of 5,000 state transitions. The default start rate for state machine executions has also increased from 200 per second to 300 per second, with bucket capacity of up to 1,300 starts in these regions.

In all other regions, throughput has increased from 400 state transitions per second to 500 state transitions per second with bucket capacity of 800 state transitions. The default start rate for AWS Step Functions state machine executions has also increased from 25 per second to 150 per second, with bucket capacity of up to 800 state machine executions.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

CloudWatch ServiceLens

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

On November 18, CloudWatch launched Embedded Metric Format to help you ingest complex high-cardinality application data in the form of logs and easily generate actionable metrics from them. You can publish these metrics from your Lambda function by using the PutLogEvents API or for Node.js or Python based applications using an open source library.

Lastly, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

X-Ray announced trace maps, which enable you to map the end to end path of a single request. Identifiers will show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. X-Ray supports tracing canary scripts throughout the application providing metrics on performance or application issues.

X-Ray Service map with CloudWatch Synthetics

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

DynamoDB announced support for customer managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Currently under preview, is another new DynamoDB capability to identify frequently accessed keys and database traffic trends. With this you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

CloudWatch Contributor Insights for DynamoDB

CloudWatch Contributor Insights for DynamoDB

Last but far from least for DynamoDB, is adaptive capacity, a feature which helps you handle imbalanced workloads by isolating frequently accessed items automatically and shifting data across partitions to rebalance them. This helps both reduce cost by enabling you to provision throughput for a more balanced out workload vs. over provisioning for uneven data access patterns.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge will appear next to your name in the SAR and will deep-link to your GitHub profile.

SAR Verified developer badges

SAR Verified developer badges

AWS Code Services

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

CodeBuild test trends

CodeBuild test trends

AWS Amplify and AWS AppSync

Instead of trying to summarize all the awesome things that our peers over in the Amplify and AppSync teams have done recently we’ll instead link you to their own recent summary: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

AWS AppSync

AWS AppSync

Still looking for more?

We only covered a small bit of all the awesome new things that were recently announced. Keep your eyes peeled for more exciting announcements next week during re:Invent and for a future ICYMI Serverless Q4 roundup. We’ll also be kicking off a fresh series of Tech Talks in 2020 with new content helping to dive deeper on everything new coming out of AWS for serverless application developers.

Decoupled Serverless Scheduler To Run HPC Applications At Scale on EC2

Post Syndicated from Emma White original https://aws.amazon.com/blogs/compute/decoupled-serverless-scheduler-to-run-hpc-applications-at-scale-on-ec2/

This post is written by Ludvig Nordstrom and Mark Duffield | on November 27, 2019

In this blog post, we dive in to a cloud native approach for running HPC applications at scale on EC2 Spot Instances, using a decoupled serverless scheduler. This architecture is ideal for many workloads in the HPC and EDA industries, and can be used for any batch job workload.

At the end of this blog post, you will have two takeaways.

  1. A highly scalable environment that can run on hundreds of thousands of cores across EC2 Spot Instances.
  2. A fully serverless architecture for job orchestration.

We discuss deploying and running a pre-built serverless job scheduler that can run both Windows and Linux applications using any executable file format for your application. This environment provides high performance, scalability, cost efficiency, and fault tolerance. We introduce best practices and benefits to creating this environment, and cover the architecture, running jobs, and integration in to existing environments.

quick note about the term cloud native: we use the term loosely in this blog. Here, cloud native  means we use AWS Services (to include serverless and microservices) to build out our compute environment, instead of a traditional lift-and-shift method.

Let’s get started!

 

Solution overview

This blog goes over the deployment process, which leverages AWS CloudFormation. This allows you to use infrastructure as code to automatically build out your environment. There are two parts to the solution: the Serverless Scheduler and Resource Automation. Below are quick summaries of each part of the solutions.

Part 1 – The serverless scheduler

This first part of the blog builds out a serverless workflow to get jobs from SQS and run them across EC2 instances. The CloudFormation template being used for Part 1 is serverless-scheduler-app.template, and here is the Reference Architecture:

 

Serverless Scheduler Reference Architecture . Reference Architecture for Part 1. This architecture shows just the Serverless Schduler. Part 2 builds out the resource allocation architecture. Outlined Steps with detail from figure one

    Figure 1: Serverless Scheduler Reference Architecture (grayed-out area is covered in Part 2).

Read the GitHub Repo if you want to look at the Step Functions workflow contained in preceding images. The walkthrough explains how the serverless application retrieves and runs jobs on its worker, updates DynamoDB job monitoring table, and manages the worker for its lifetime.

 

Part 2 – Resource automation with serverless scheduler


This part of the solution relies on the serverless scheduler built in Part 1 to run jobs on EC2.  Part 2 simplifies submitting and monitoring jobs, and retrieving results for users. Jobs are spread across our cost-optimized Spot Instances. AWS Autoscaling automatically scales up the compute resources when jobs are submitted, then terminates them when jobs are finished. Both of these save you money.

The CloudFormation template used in Part 2 is resource-automation.template. Building on Figure 1, the additional resources launched with Part 2 are noted in the following image, they are an S3 Bucket, AWS Autoscaling Group, and two Lambda functions.

Resource Automation using Serverless Scheduler This is Part 2 of the deployment process, and leverages the Part 1 architecture. This provides the resource allocation, that allows for automated job submission and EC2 Auto Scaling. Detailed steps for the prior image

 

Figure 2: Resource Automation using Serverless Scheduler

                               

Introduction to decoupled serverless scheduling

HPC schedulers traditionally run in a classic master and worker node configuration. A scheduler on the master node orchestrates jobs on worker nodes. This design has been successful for decades, however many powerful schedulers are evolving to meet the demands of HPC workloads. This scheduler design evolved from a necessity to run orchestration logic on one machine, but there are now options to decouple this logic.

What are the possible benefits that decoupling this logic could bring? First, we avoid a number of shortfalls in the environment such as the need for all worker nodes to communicate with a single master node. This single source of communication limits scalability and creates a single point of failure. When we split the scheduler into decoupled components both these issues disappear.

Second, in an effort to work around these pain points, traditional schedulers had to create extremely complex logic to manage all workers concurrently in a single application. This stifled the ability to customize and improve the code – restricting changes to be made by the software provider’s engineering teams.

Serverless services, such as AWS Step Functions and AWS Lambda fix these major issues. They allow you to decouple the scheduling logic to have a one-to-one mapping with each worker, and instead share an Amazon Simple Queue Service (SQS) job queue. We define our scheduling workflow in AWS Step Functions. Then the workflow scales out to potentially thousands of “state machines.” These state machines act as wrappers around each worker node and manage each worker node individually.  Our code is less complex because we only consider one worker and its job.

We illustrate the differences between a traditional shared scheduler and decoupled serverless scheduler in Figures 3 and 4.

 

Traditional Scheduler Model This shows a traditional sceduler where there is one central schduling host, and then multiple workers.

Figure 3: Traditional Scheduler Model

 

Decoupled Serverless Scheduler on each instance This shows what a Decoupled Serverless Scheduler design looks like, wit

Figure 4: Decoupled Serverless Scheduler on each instance

 

Each decoupled serverless scheduler will:

  • Retrieve and pass jobs to its worker
  • Monitor its workers health and take action if needed
  • Confirm job success by checking output logs and retry jobs if needed
  • Terminate the worker when job queue is empty just before also terminating itself

With this new scheduler model, there are many benefits. Decoupling schedulers into smaller schedulers increases fault tolerance because any issue only affects one worker. Additionally, each scheduler consists of independent AWS Lambda functions, which maintains the state on separate hardware and builds retry logic into the service.  Scalability also increases, because jobs are not dependent on a master node, which enables the geographic distribution of jobs. This geographic distribution allows you to optimize use of low-cost Spot Instances. Also, when decoupling the scheduler, workflow complexity decreases and you can customize scheduler logic. You can leverage lower latency job monitoring and customize automated responses to job events as they happen.

 

Benefits

  • Fully managed –  With Part 2, Resource Automation deployed, resources for a job are managed. When a job is submitted, resources launch and run the job. When the job is done, worker nodes automatically shut down. This prevents you from incurring continuous costs.

 

  • Performance – Your application runs on EC2, which means you can choose any of the high performance instance types. Input files are automatically copied from Amazon S3 into local Amazon EC2 Instance Store for high performance storage during execution. Result files are automatically moved to S3 after each job finishes.

 

  • Scalability – A worker node combined with a scheduler state machine become a stateless entity. You can spin up as many of these entities as you want, and point them to an SQS queue. You can even distribute worker and state machine pairs across multiple AWS regions. These two components paired with fully managed services optimize your architecture for scalability to meet your desired number of workers.

 

  • Fault Tolerance –The solution is completely decoupled, which means each worker has its own state machine that handles scheduling for that worker. Likewise, each state machine is decoupled into Lambda functions that make up your state machine. Additionally, the scheduler workflow includes a Lambda function that confirms each successful job or resubmits jobs.

 

  • Cost Efficiency – This fault tolerant environment is perfect for EC2 Spot Instances. This means you can save up to 90% on your workloads compared to On-Demand Instance pricing. The scheduler workflow ensures little to no idle time of workers by closely monitoring and sending new jobs as jobs finish. Because the scheduler is serverless, you only incur costs for the resources required to launch and run jobs. Once the job is complete, all are terminated automatically.

 

  • Agility – You can use AWS fully managed Developer Tools to quickly release changes and customize workflows. The reduced complexity of a decoupled scheduling workflow means that you don’t have to spend time managing a scheduling environment, and can instead focus on your applications.

 

 

Part 1 – serverless scheduler as a standalone solution

 

If you use the serverless scheduler as a standalone solution, you can build clusters and leverage shared storage such as FSx for Lustre, EFS, or S3. Additionally, you can use AWS CloudFormation or to deploy more complex compute architectures that suit your application. So, the EC2 Instances that run the serverless scheduler can be launched in any number of ways. The scheduler only requires the instance id and the SQS job queue name.

 

Submitting Jobs Directly to serverless scheduler

The severless scheduler app is a fully built AWS Step Function workflow to pull jobs from an SQS queue and run them on an EC2 Instance. The jobs submitted to SQS consist of an AWS Systems Manager Run Command, and work with any SSM Document and command that you chose for your jobs. Examples of SSM Run Commands are ShellScript and PowerShell.  Feel free to read more about Running Commands Using Systems Manager Run Command.

The following code shows the format of a job submitted to SQS in JSON.

  {

    "job_id": "jobId_0",

    "retry": "3",

    "job_success_string": " ",

    "ssm_document": "AWS-RunPowerShellScript",

    "commands":

        [

            "cd C:\\ProgramData\\Amazon\\SSM; mkdir Result",

            "Copy-S3object -Bucket my-bucket -KeyPrefix jobs/date/jobId_0 -LocalFolder .\\",

            "C:\\ProgramData\\Amazon\\SSM\\jobId_0.bat",

            "Write-S3object -Bucket my-bucket -KeyPrefix jobs/date/jobId_0 –Folder .\\Result\\"

        ],

  }

 

Any EC2 Instance associated with a serverless scheduler it receives jobs picked up from a designated SQS queue until the queue is empty. Then, the EC2 resource automatically terminates. If the job fails, it retries until it reaches the specified number of times in the job definition. You can include a specific string value so that the scheduler searches for job execution outputs and confirms the successful completions of jobs.

 

Tagging EC2 workers to get a serverless scheduler state machine

In Part 1 of the deployment, you must manage your EC2 Instance launch and termination. When launching an EC2 Instance, tag it with a specific tag key that triggers a state machine to manage that instance. The tag value is the name of the SQS queue that you want your state machine to poll jobs from.

In the following example, “my-scheduler-cloudformation-stack-name” is the tag key that serverless scheduler app will for with any new EC2 instance that starts. Next, “my-sqs-job-queue-name” is the default job queue created with the scheduler. But, you can change this to any queue name you want to retrieve jobs from when an instance is launched.

{"my-scheduler-cloudformation-stack-name":"my-sqs-job-queue-name"}

 

Monitor jobs in DynamoDB

You can monitor job status in the following DynamoDB. In the table you can find job_id, commands sent to Amazon EC2, job status, job output logs from Amazon EC2, and retries among other things.

Alternatively, you can query DynamoDB for a given job_id via the AWS Command Line Interface:

aws dynamodb get-item --table-name job-monitoring \

                      --key '{"job_id": {"S": "/my-jobs/my-job-id.bat"}}'

 

Using the “job_success_string” parameter

For the prior DynamoDB table, we submitted two identical jobs using an example script that you can also use. The command sent to the instance is “echo Hello World.” The output from this job should be “Hello World.” We also specified three allowed job retries.  In the following image, there are two jobs in SQS queue before they ran.  Look closely at the different “job_success_strings” for each and the identical command sent to both:

DynamoDB CLI info This shows an example DynamoDB CLI output with job information.

From the image we see that Job2 was successful and Job1 retried three times before permanently labelled as failed. We forced this outcome to demonstrate how the job success string works by submitting Job1 with “job_success_string” as “Hello EVERYONE”, as that will not be in the job output “Hello World.” In “Job2” we set “job_success_string” as “Hello” because we knew this string will be in the output log.

Job outputs commonly have text that only appears if job succeeded. You can also add this text yourself in your executable file. With “job_success_string,” you can confirm a job’s successful output, and use it to identify a certain value that you are looking for across jobs.

 

Part 2 – Resource Automation with the serverless scheduler

The additional services we deploy in Part 2 integrate with existing architectures to launch resources for your serverless scheduler. These services allow you to submit jobs simply by uploading input files and executable files to an S3 bucket.

Likewise, these additional resources can use any executable file format you want, including proprietary application level scripts. The solution automates everything else. This includes creating and submitting jobs to SQS job queue, spinning up compute resources when new jobs come in, and taking them back down when there are no jobs to run. When jobs are done, result files are copied to S3 for the user to retrieve. Similar to Part 1, you can still view the DynamoDB table for job status.

This architecture makes it easy to scale out to different teams and departments, and you can submit potentially hundreds of thousands of jobs while you remain in control of resources and cost.

 

Deeper Look at the S3 Architecture

The following diagram shows how you can submit jobs, monitor progress, and retrieve results. To submit jobs, upload all the needed input files and an executable script to S3. The suffix of the executable file (uploaded last) triggers an S3 event to start the process, and this suffix is configurable.

The S3 key of the executable file acts as the job id, and is kept as a reference to that job in DynamoDB. The Lambda (#2 in diagram below) uses the S3 key of the executable to create three SSM Run Commands.

  1. Synchronize all files in the same S3 folder to a working directory on the EC2 Instance.
  2. Run the executable file on EC2 Instances within a specified working directory.
  3. Synchronize the EC2 Instances working directory back to the S3 bucket where newly generated result files are included.

This Lambda (#2) then places the job on the SQS queue using the schedulers JSON formatted job definition seen above.

IMPORTANT: Each set of job files should be given a unique job folder in S3 or more files than needed might be moved to the EC2 Instance.

 

Figure 5: Resource Automation using Serverless Scheduler - A deeper look A deeper dive in to Part 2, resource allcoation.

Figure 5: Resource Automation using Serverless Scheduler – A deeper look

 

EC2 and Step Functions workflow use the Lambda function (#3 in prior diagram) and the Auto Scaling group to scale out based on the number of jobs in the queue to a maximum number of workers (plus state machine), as defined in the Auto Scaling Group. When the job queue is empty, the number of running instances scale down to 0 as they finish their remaining jobs.

 

Process Submitting Jobs and Retrieving Results

  1. Seen in1, upload input file(s) and an executable file into a unique job folder in S3 (such as /year/month/day/jobid/~job-files). Upload the executable file last because it automatically starts the job. You can also use a script to upload multiple files at a time but each job will need a unique directory. There are many ways to make S3 buckets available to users including AWS Storage Gateway, AWS Transfer for SFTP, AWS DataSync, the AWS Console or any one of the AWS SDKs leveraging S3 API calls.
  2. You can monitor job status by accessing the DynamoDB table directly via the AWS Management Console or use the AWS CLI to call DynamoDB via an API call.
  3. Seen in step 5, you can retrieve result files for jobs from the same S3 directory where you left the input files. The DynamoDB table confirms when jobs are done. The SQS output queue can be used by applications that must automatically poll and retrieve results.

You no longer need to create or access compute nodes as compute resources. These automatically scale up from zero when jobs come in, and then back down to zero when jobs are finished.

 

Deployment

Read the GitHub Repo for deployment instructions. Below are CloudFormation templates to help:

AWS RegionLaunch Stack
eu-north-1link to zone
ap-south-1
eu-west-3
eu-west-2
eu-west-1
ap-northeast-3
ap-northeast-2
ap-northeast-1
sa-east-1
ca-central-1
ap-southeast-1
ap-southeast-2
eu-central-1
us-east-1
us-east-2
us-west-1
us-west-2

 

 

Additional Points on Usage Patterns

 

  • While the two solutions in this blog are aimed at HPC applications, they can be used to run any batch jobs. Many customers that run large data processing batch jobs in their data lakes could use the serverless scheduler.

 

  • You can build pipelines of different applications when the output of one job triggers another to do something else – an example being pre-processing, meshing, simulation, post-processing. You simply deploy the Resource Automation template several times, and tailor it so that the output bucket for one step is the input bucket for the next step.

 

  • You might look to use the “job_success_string” parameter for iteration/verification used in cases where a shot-gun approach is needed to run thousands of jobs, and only one has a chance of producing the right result. In this case the “job_success_string” would identify the successful job from potentially hundreds of thousands pushed to SQS job queue.

 

Scale-out across teams and departments

Because all services used are serverless, you can deploy as many run environments as needed without increasing overall costs. Serverless workloads only accumulate cost when the services are used. So, you could deploy ten job environments and run one job in each, and your costs would be the same if you had one job environment running ten jobs.

 

All you need is an S3 bucket to upload jobs to and an associated AMI that has the right applications and license configuration. Because a job configuration is passed to the scheduler at each job start, you can add new teams by creating an S3 bucket and pointing S3 events to a default Lambda function that pulls configurations for each job start.

 

Setup CI/CD pipeline to start continuous improvement of scheduler

If you are advanced, we encourage you to clone the git repo and customize this solution. The serverless scheduler is less complex than other schedulers, because you only think about one worker and the process of one job’s run.

Ways you could tailor this solution:

  • Add intelligent job scheduling using AWS Sagemaker  – It is hard to find data as ready for ML as log data because every job you run has different run times and resource consumption. So, you could tailor this solution to predict the best instance to use with ML when workloads are submitted.
  • Add Custom Licensing Checkout Logic – Simply add one Lambda function to your Step Functions workflow to make an API call a license server before continuing with one or more jobs. You can start a new worker when you have a license checked out or if a license is not available then the instance can terminate to remove any costs waiting for licenses.
  • Add Custom Metrics to DynamoDB – You can easily add metrics to DynamoDB because the solution already has baseline logging and monitoring capabilities.
  • Run on other AWS Services – There is a Lambda function in the Step Functions workflow called “Start_Job”. You can tailor this Lambda to run your jobs on AWS Sagemaker, AWS EMR, AWS EKS or AWS ECS instead of EC2.

 

Conclusion

 

Although HPC workloads and EDA flows may still be dependent on current scheduling technologies, we illustrated the possibilities of decoupling your workloads from your existing shared scheduling environments. This post went deep into decoupled serverless scheduling, and we understand that it is difficult to unwind decades of dependencies. However, leveraging numerous AWS Services encourages you to think completely differently about running workloads.

But more importantly, it encourages you to Think Big. With this solution you can get up and running quickly, fail fast, and iterate. You can do this while scaling to your required number of resources, when you want them, and only pay for what you use.

Serverless computing  catalyzes change across all industries, but that change is not obvious in the HPC and EDA industries. This solution is an opportunity for customers to take advantage of the nearly limitless capacity that AWS.

Please reach out with questions about HPC and EDA on AWS. You now have the architecture and the instructions to build your Serverless Decoupled Scheduling environment.  Go build!


About the Authors and Contributors

Authors 

 

Ludvig Nordstrom is a Senior Solutions Architect at AWS

 

 

 

 

Mark Duffield is a Tech Lead in Semiconductors at AWS

 

 

 

Contributors

 

Steve Engledow is a Senior Solutions Builder at AWS

 

 

 

 

Arun Thomas is a Senior Solutions Builder at AWS

 

 

Running Cost-effective queue workers with Amazon SQS and Amazon EC2 Spot Instances

Post Syndicated from peven original https://aws.amazon.com/blogs/compute/running-cost-effective-queue-workers-with-amazon-sqs-and-amazon-ec2-spot-instances/

This post is contributed by Ran Sheinberg | Sr. Solutions Architect, EC2 Spot & Chad Schmutzer | Principal Developer Advocate, EC2 Spot | Twitter: @schmutze

Introduction

Amazon Simple Queue Service (SQS) is used by customers to run decoupled workloads in the AWS Cloud as a best practice, in order to increase their applications’ resilience. You can use a worker tier to do background processing of images, audio, documents and so on, as well as offload long-running processes from the web tier. This blog post covers the benefits of pairing Amazon SQS and Spot Instances to maximize cost savings in the worker tier, and a customer success story.

Solution Overview

Amazon SQS is a fully managed message queuing service that enables customers to decouple and scale microservices, distributed systems, and serverless applications. It is a common best practice to use Amazon SQS with decoupled applications. Amazon SQS increases applications resilience by decoupling the direct communication between the frontend application and the worker tier that does data processing. If a worker node fails, the jobs that were running on that node return to the Amazon SQS queue for a different node to pick up.

Both the frontend and worker tier can run on Spot Instances, which offer spare compute capacity at steep discounts compared to On-Demand Instances. Spot Instances optimize your costs on the AWS Cloud and scale your application’s throughput up to 10 times for the same budget. Spot Instances can be interrupted with two minutes of notification when EC2 needs the capacity back. You can use Spot Instances for various fault-tolerant and flexible applications. These can include analytics, containerized workloads, high performance computing (HPC), stateless web servers, rendering, CI/CD, and queue worker nodes—which is the focus of this post.

Worker tiers of a decoupled application are typically fault-tolerant. So, it is a prime candidate for running on interruptible capacity. Amazon SQS running on Spot Instances allows for more robust, cost-optimized applications.

By using EC2 Auto Scaling groups with multiple instance types that you configured as suitable for your application (for example, m4.xlarge, m5.xlarge, c5.xlarge, and c4.xlarge, in multiple Availability Zones), you can spread the worker tier’s compute capacity across many Spot capacity pools (a combination of instance type and Availability Zone). This increases the chance of achieving the scale that’s required for the worker tier to ingest messages from the queue, and of keeping that scale when Spot Instance interruptions occur, while selecting the lowest-priced Spot Instances in each availability zone.

You can also choose the capacity-optimized allocation strategy for the Spot Instances in your Auto Scaling group. This strategy automatically selects instances that have a lower chance of interruption, which decreases the chances of restarting jobs due to Spot interruptions. When Spot Instances are interrupted, your Auto Scaling group automatically replenishes the capacity from a different Spot capacity pool in order to achieve your desired capacity. Read the blog post “Introducing the capacity-optimized allocation strategy for Amazon EC2 Spot Instances” for more details on how to choose the suitable allocation strategy.

We focus on three main points in this blog:

  1. Best practices for using Spot Instances with Amazon SQS
  2. A customer example that uses these components
  3. Example solution that can help you get you started quickly

Application of Amazon SQS with Spot Instances

Amazon SQS eliminates the complexity of managing and operating message-oriented middleware. Using Amazon SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. Amazon SQS is a fully managed service which allows you to set up a queue in seconds. It also allows you to use your preferred SDK to start writing and reading to and from the queue within minutes.

In the following example, we describe an AWS architecture that brings together the Amazon SQS queue and an EC2 Auto Scaling group running Spot Instances. The architecture is used for decoupling the worker tier from the web tier by using Amazon SQS. The example uses the Protect feature (which we will explain later in this post) to ensure that an instance currently processing a job does not get terminated by the Auto Scaling group when it detects that a scale-in activity is required due to a Dynamic Scaling Policy.Architecture diagram for using Amazon SQS with Spot Instances and Auto Scaling groups

AWS reference architecture used for decoupling the worker tier from the web tier by using Amazon SQS

Customer Example: How Trax Retail uses Auto Scaling groups with Spot Instances in their Amazon SQS application

Trax decided to run its queue worker tier exclusively on Spot Instances due to the fault-tolerant nature of its architecture and for cost-optimization purposes. The company digitizes the physical world of retail using Computer Vision. Their ‘Trax Factory’ transforms individual shelf into data and insights about retail store conditions.

Built using asynchronous event-driven architecture, Trax Factory is a cluster of microservices in which the completion of one service triggers the activation of another service. The worker tier uses Auto Scaling groups with dynamic scaling policies to increase and decrease the number of worker nodes in the worker tier.

You can create a Dynamic Scaling Policy by doing the following:

  1. Observe a Amazon CloudWatch metric. Watch the metric for the current number of messages in the Amazon SQS queue (ApproximateNumberOfMessagesVisible).
  2. Create a CloudWatch alarm. This alarm should be based on that metric you created in the prior step.
  3. Use your CloudWatch alarm in a Dynamic Scaling Policy. Use this policy increase and decrease the number of EC2 Instances in the Auto Scaling group.

In Trax’s case, due to the high variability of the number of messages in the queue, they opted to enhance this approach in order to minimize the time it takes to scale, by building a service that would call the SQS API and find the current number of messages in the queue more frequently, instead of waiting for the 5 minute metric refresh interval in CloudWatch.

Trax ensures that its applications are always scaled to meet the demand by leveraging the inherent elasticity of Amazon EC2 instances. This elasticity ensures that end users are never affected and/or service-level agreements (SLA) are never violated.

With a Dynamic Scaling Policy, the Auto Scaling group can detect when the number of messages in the queue has decreased, so that it can initiate a scale-in activity. The Auto Scaling group uses its configured termination policy for selecting the instances to be terminated. However, this policy poses the risk that the Auto Scaling group might select an instance for termination while that instance is currently processing an image. That instance’s work would be lost (although the image would eventually be processed by reappearing in the queue and getting picked up by another worker node).

To decrease this risk, you can use Auto Scaling groups instance protection. This means that every time an instance fetches a job from the queue, it also sends an API call to EC2 to protect itself from scale-in. The Auto Scaling group does not select the protected, working instance for termination until the instance finishes processing the job and calls the API to remove the protection.

Handling Spot Instance interruptions

This instance-protection solution ensures that no work is lost during scale-in activities. However, protecting from scale-in does not work when an instance is marked for termination due to Spot Instance interruptions. These interruptions occur when there’s increased demand for On-Demand Instances in the same capacity pool (a combination of an instance type in an Availability Zone).

Applications can minimize the impact of a Spot Instance interruption. To do so, an application catches the two-minute interruption notification (available in the instance’s metadata), and instructs itself to stop fetching jobs from the queue. If there’s an image still being processed when the two minutes expire and the instance is terminated, the application does not delete the message from the queue after finishing the process. Instead, the message simply becomes visible again for another instance to pick up and process after the Amazon SQS visibility timeout expires.

Alternatively, you can release any ongoing job back to the queue upon receiving a Spot Instance interruption notification by setting the visibility timeout of the specific message to 0. This timeout potentially decreases the total time it takes to process the message.

Testing the solution

If you’re not currently using Spot Instances in your queue worker tier, we suggest testing the approach described in this post.

For that purpose, we built a simple solution to demonstrate the capabilities mentioned in this post, using an AWS CloudFormation template. The stack includes an Amazon Simple Storage Service (S3) bucket with a CloudWatch trigger to push notifications to an SQS queue after an image is uploaded to the Amazon S3 bucket. Once the message is in the queue, it is picked up by the application running on the EC2 instances in the Auto Scaling group. Then, the image is converted to PDF, and the instance is protected from scale-in for as long as it has an active processing job.

To see the solution in action, deploy the CloudFormation template. Then upload an image to the Amazon S3 bucket. In the Auto Scaling Groups console, check the instance protection status on the Instances tab. The protection status is shown in the following screenshot.

instance protection status in console

You can also see the application logs using CloudWatch Logs:

/usr/local/bin/convert-worker.sh: Found 1 messages in https://sqs.us-east-1.amazonaws.com/123456789012/qtest-sqsQueue-1CL0NYLMX64OB

/usr/local/bin/convert-worker.sh: Found work to convert. Details: INPUT=Capture1.PNG, FNAME=capture1, FEXT=png

/usr/local/bin/convert-worker.sh: Running: aws autoscaling set-instance-protection --instance-ids i-0a184c5ae289b2990 --auto-scaling-group-name qtest-autoScalingGroup-QTGZX5N70POL --protected-from-scale-in

/usr/local/bin/convert-worker.sh: Convert done. Copying to S3 and cleaning up

/usr/local/bin/convert-worker.sh: Running: aws s3 cp /tmp/capture1.pdf s3://qtest-s3bucket-18fdpm2j17wxx

/usr/local/bin/convert-worker.sh: Running: aws sqs --output=json delete-message --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/qtest-sqsQueue-1CL0NYLMX64OB --receipt-handle

/usr/local/bin/convert-worker.sh: Running: aws autoscaling set-instance-protection --instance-ids i-0a184c5ae289b2990 --auto-scaling-group-name qtest-autoScalingGroup-QTGZX5N70POL --no-protected-from-scale-in

Conclusion

This post helps you architect fault tolerant worker tiers in a cost optimized way. If your queue worker tiers are fault tolerant and use the built-in Amazon SQS features, you can increase your application’s resilience and take advantage of Spot Instances to save up to 90% on compute costs.

In this post, we emphasized several best practices to help get you started saving money using Amazon SQS and Spot Instances. The main best practices are:

  • Diversifying your Spot Instances using Auto Scaling groups, and selecting the right Spot allocation strategy
  • Protecting instances from scale-in activities while they process jobs
  • Using the Spot interruption notification so that the application stop polling the queue for new jobs before the instance is terminated

We hope you found this post useful. If you’re not using Spot Instances in your queue worker tier, we suggest testing the approach described here. Finally, we would like to thank the Trax team for sharing its architecture and best practices. If you want to learn more, watch the “This is my architecture” video featuring Trax and their solution.

We’d love your feedback—please comment and let me know what you think.


About the authors

 

Ran Sheinberg is a specialist solutions architect for EC2 Spot Instances with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

As a Principal Developer Advocate for EC2 Spot at AWS, Chad’s job is to make sure our customers are saving at scale by using EC2 Spot Instances to take advantage of the most cost-effective way to purchase compute capacity. Follow him on Twitter here! @schmutze

 

Introducing AWS Lambda Destinations

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/

Today we’re announcing AWS Lambda Destinations for asynchronous invocations. This is a feature that provides visibility into Lambda function invocations and routes the execution results to AWS services, simplifying event-driven applications and reducing code complexity.

Asynchronous invocations

When a function is invoked asynchronously, Lambda sends the event to an internal queue. A separate process reads events from the queue and executes your Lambda function. When the event is added to the queue, Lambda previously only returned a 2xx status code to confirm that the queue has received this event. There was no additional information to confirm whether the event had been processed successfully.

A common event-driven microservices architectural pattern is to use a queue or message bus for communication. This helps with resilience and scalability. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), or Amazon EventBridge for further processing. Previously, you needed to write the SQS/SNS/EventBridge handling code within your Lambda function and manage retries and failures yourself.

With Destinations, you can route asynchronous function results as an execution record to a destination resource without writing additional code. An execution record contains details about the request and response in JSON format including version, timestamp, request context, request payload, response context, and response payload. For each execution status such as Success or Failure you can choose one of four destinations: another Lambda function, SNS, SQS, or EventBridge. Lambda can also be configured to route different execution results to different destinations.

Asynchronous Function Execution Result

Success

When a function is invoked successfully, Lambda routes the record to the destination resource for every successful invocation. You can use this to monitor the health of your serverless applications via execution status or build workflows based on the invocation result.

You no longer need to chain long-running Lambda functions together synchronously. Previously you needed to complete the entire workflow within the Lambda 15-minute function timeout, pay for idle time, and wait for a response. Destinations allows you to return a Success response to the calling function and then handle the remaining chaining functions asynchronously.

Failure

Alongside today’s announcement of Maximum Event Age and Maximum Retry Attempt for asynchronous invocations, Destinations gives you the ability to handle the Failure of function invocations along with their Success. When a function invocation fails, such as when retries are exhausted or the event age has been exceeded (hitting its TTL), Destinations routes the record to the destination resource for every failed invocation for further investigation or processing.

Dead Letter Queues (DLQ) have been available since 2016 and are a great way to handle asynchronous failure situations. Destinations provide more useful capabilities by passing additional function execution information, including code exception stack traces, to more destination services.

Destinations and DLQs can be used together and at the same time although Destinations should be considered a more preferred solution. If you already have DLQs set up, existing functionality does not change and Destinations does not replace existing DLQ configurations. If both Destinations and DLQ are used for Failure notifications, function invoke errors are sent to both DLQ and Destinations targets.

How to configure Destinations

Adding Destinations is a straightforward process. This walkthrough uses the AWS Management Console but you can also use the AWS CLI, AWS SAM, AWS CloudFormation, or language-specific SDKs for Lambda.

  1. Open the Lambda console Functions page. Choose an existing Lambda function, or create a new one. In this example, I create a new Lambda function. Choose Create Function.
  2. Enter a Function name, select Node.js 12.x for Runtime, and Choose or create an execution role. Ensure that your Lambda function execution role includes access to the destination resource.
    Basic information
  3. Choose Create function.
  4. Within the Function code pane, paste the following Lambda function code. The code generates a function execution result of either Success or Failure depending on a JSON input ("Success": true or "Success": false).
    // Lambda Destinations tester, Success returns a json blob, Failure throws an error
    
    exports.handler = function(event, context, callback) {
        var event_received_at = new Date().toISOString();
        console.log('Event received at: ' + event_received_at);
        console.log('Received event:', JSON.stringify(event, null, 2));
    
        if (event.Success) {
            console.log("Success");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(null);
        } else {
            console.log("Failure");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(new Error("Failure from event, Success = false, I am failing!"), 'Destination Function Error Thrown');
        }
    };
    
  5. Choose Save.
  6. To configure Destinations, within the Designer pane, choose Add destination.
    Designer pane
  7. Select the Source as Asynchronous invocation. Select the Condition as On failure or On success, depending on your use case. In this example, I select On Success.
  8. Enter the Amazon Resource Name (ARN) for the Destination SQS queue, SNS topic, Lambda function, or EventBridge event bus. In this example, I use the ARN of an SNS topic I have already configured.
    Add destination
  9. Choose Save. The Destination is added to SNS for On Success.
    Designer
  10. Add another Destination for Failure to Lambda. Within the Designer pane, choose Add destination.
    Add destination
  11. Select the Source as Asynchronous invocation, the Condition as On failure and Enter a Destination Lambda function ARN, then choose Save.
    Enter a Destination Lambda function ARN, and choose Save
  12. The Destination is added to Lambda for On Failure.
    7. The Destination has been added to Lambda for On Failure.

Success testing

To test invoking the asynchronous Lambda function to generate a Success result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": true }' response.json

The Lambda function is invoked successfully with a response "StatusCode": 202.

And an SNS notification email is received, showing the invocation details with "condition":"Success" and the requestPayload.

{
	"version": "1.0",
	"timestamp": "2019-11-24T23:08:25.651Z",
	"requestContext": {
		"requestId": "c2a6f2ae-7dbb-4d22-8782-d0485c9877e2",
		"functionArn": "arn:aws:lambda:sa-east-1:123456789123:function:event-destinations:$LATEST",
		"condition": "Success",
		"approximateInvokeCount": 1
	},
	"requestPayload": {
		"Success": true
	},
	"responseContext": {
		"statusCode": 200,
		"executedVersion": "$LATEST"
	},
	"responsePayload": null
}

Failure testing

The Lambda function can be set to Failure by throwing an exception within the code. To test invoking the asynchronous Lambda function to generate a Failure result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": false }' response.json

The Lambda function is executed and reports a successful invoke on the Lambda processing queue. If Lambda is not able to add the event to the queue, the error message appears in the command output.

However, due to the exception error within the code, the function invocation will fail. Destinations then routes the invoke failure to the configured destination Lambda function. You can see the failed function invocation information in the Amazon CloudWatch Logs for the Destination function including "condition": "RetriesExhausted", along with the requestPayload, errorMessage, and stackTrace.

2019-11-24T21:52:47.855Z	d123456-c0dd-4871-a123-a356cb1b3ba6	EVENT
{
    "version": "1.0",
    "timestamp": "2019-11-24T21:52:47.333Z",
    "requestContext": {
        "requestId": "8ea123e4-1db7-4aca-ad10-d9ca1234c1fd",
        "functionArn": "arn:aws:lambda:sa-east-1:123456678912:function:event-destinations:$LATEST",
        "condition": "RetriesExhausted",
        "approximateInvokeCount": 3
    },
    "requestPayload": {
        "Success": false
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Handled"
    },
    "responsePayload": {
        "errorMessage": "Failure from event, Success = false, I am failing!",
        "errorType": "Error",
        "stackTrace": [ "exports.handler (/var/task/index.js:18:18)" ]
    }
}

Destination-specific JSON format

  • For SNS/SQS, the JSON object is passed as the Message to the destination.
  • For Lambda, the JSON is passed as the payload to the function. The destination function cannot be the same as the source function. For example, if LambdaA has a Destination configuration attached for Success, LambdaA is not a valid destination ARN. This prevents recursive functions.
  • For EventBridge, the JSON is passed as the Detail in the PutEvents call. The source is lambda, and detail type is either Lambda Function Invocation Result - Success or Lambda Function Invocation Result – Failure. The resource fields contain the function and destination ARNs.

AWS CloudFormation configuration

Destinations CloudFormation configuration is created via the following YAML.

Resources: 
  EventInvokeConfig:
    Type: AWS::Lambda::EventInvokeConfig
    Properties:
        FunctionName: “YourLambdaFunctionWithEventInvokeConfig”
        Qualifier: "$LATEST"
        MaximumEventAgeInSeconds: 600
        MaximumRetryAttempts: 0
        DestinationConfig:
            OnSuccess:
                Destination: “arn:aws:sns:us-east-1:123456789012:YourSNSTopicOnSuccess”
            OnFailure:
                Destination: “arn:aws:lambda:us-east-1:123456789012:function:YourLambdaFunctionOnFailure”

Conclusion

AWS Lambda Destinations gives you more visibility and control of function execution results. This helps you build better event-driven applications, reducing code, and using Lambda’s native failure handling controls.

There are no additional costs for enabling Lambda Destinations. However, calls made to destination target services may be charged.

To learn more, see Lambda Destinations in the AWS Lambda Developer Guide.

New for AWS Lambda – SQS FIFO as an event source

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/new-for-aws-lambda-sqs-fifo-as-an-event-source/

AWS Lambda first announced support for Amazon SQS standard queues as an event source in April 2018. This allows builders to develop serverless applications using queues to directly invoke Lambda functions. Today, we have expanded this feature to include SQS FIFO queues. This makes it easier to create serverless applications using queues where the order of messages is important.

Ordered events and operations are critical for some types of event-driven application. For example, in stock market trading applications, the order of events for a trade is essential to determine the time, price, and parties in a transaction. Similarly, in applications managing pricing or inventory, the order of events is key for maintaining the data integrity of the system.

Using SQS FIFO with Lambda is straight-forward, with only minor differences in configuration compared with standard SQS queues. This blog post explains how serverless developers can get started using SQS FIFO queues in their Lambda functions.

How it works

SQS FIFO queues are similar to standard queues but use two additional attributes in the SendMessage API. These attributes are also returned in the event payload to the consuming Lambda function:

  • MessageGroupID: this required field enables multiple message groups within a single queue. If you do not need this functionality, provide the same MessageGroupId value for all messages. Messages within a single group are processed in a FIFO fashion.
  • MessageDeduplicationID: this token is used to deduplicate messages within a 5-minute interval. If two messages with the same MessageDuplicationID are sent, both messages are accepted successfully but only one is delivered. Learn more about the best practices for using this attribute.

You can submit a message into an SQS FIFO queue using the AWS Management Console, AWS CLI, AWS SAM, or AWS SDK. This SDK example uses Node.js to submit a single message:

const AWS = require('aws-sdk')
AWS.config.update({region: 'us-west-2'})
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})

const send = async (groupId, messageId) => {
  return await sqs.sendMessage({
    MessageGroupId: `group-${groupId}`,
    MessageDeduplicationId: `m-${groupId}-${messageId}`,
    MessageBody: `${messageId}`,
    QueueUrl: "https://sqs.us-west-2.amazonaws.com/123456789012/myTestQueue.fifo"
  }).promise()
}

const main = async () => {
  await send ('A', '1')
}
main()

When the Lambda function is triggered by the SQS FIFO queue, these two attributes are delivered as part of the event payload:

SQS FIFO payload

When configured this way, the SQS FIFO queue sends as many messages as possible to the consuming Lambda function, subject to the Batch size configured in the trigger. The Batch size determines the number of items to read from the queue, up to a maximum of 10 items, though a single batch is smaller if the queue has fewer items.

For example, if you submit six messages to a queue with a Batch size of 10 using two separate MessageGroupId values, the consuming Lambda function would receive all six items:

 

SQS FIFO example #1

As all the messages are processed in a single batch, only one Lambda function is invoked. A more complex example shows how the Lambda service uses concurrency to process a busy SQS FIFO queue more efficiently.

SQS FIFO example #2

In this example, there are three message groups, and a Lambda function with an SQS FIFO trigger. The Batch size for the Lambda trigger is 5, and there are 53 messages in total:

  • Message Group A consists of 18 messages, A1-A18
  • Message Group B consists of 12 messages, B1-B12
  • Message Group C consists of 23 messages, C1-C23

There are three rules governing the order of messages leaving a FIFO queue, to help understand the processing behavior:

  1. Return the oldest message where no other message with the same MessageGroupId is in flight.
  2. Return as many messages with the same MessageGroupId as possible.
  3. If a message batch is still not full, go back to the first rule. As a result, it’s possible for a single batch to contain messages from multiple MessageGroupIds.

In this case, the SQS FIFO queue receives these messages, and orders the items within each message group. In processing, the following events occur:

  1. Consumer #1 receives 5 messages in batch #1 (C1-C5).
  2. Consumer #1 receives 5 messages in batch #2 (B1-B5).
  3. As there are messages in flight for group B, and there are items in the queue from other groups ready for processing, Lambda scales up. Consumer #2 now receives 5 messages in batch #2 (C6-C10).
  4. Consumer #3 receives batch #3 (A1-A5).
  5. Each Lambda instance will continue receiving batches of 5 messages until the queue is empty.

Lambda automatically scales out horizontally to consume the messages in the queue. It will try to consume the queue as quickly and efficiently as possible by maximizing concurrency within the bounds of each service. As queue traffic fluctuates, the Lambda service scales the polling operations based on the number of inflight messages.

In SQS FIFO queues, using more than one MessageGroupId enables Lambda to scale up and process more items in the queue using a greater concurrency limit. Total concurrency is equal to or less than the number of unique MessageGroupIds in the SQS FIFO queue. Learn more about AWS Lambda Function Scaling and how it applies to your event source.

Amazon SQS FIFO queues ensure that the order of processing follows the message order within a message group. However, it does not guarantee only once delivery when used as a Lambda trigger. If only once delivery is important in your serverless application, it’s recommended to make your function idempotent. You could achieve this by tracking a unique attribute of the message using a scalable, low-latency control database like Amazon DynamoDB.

Triggering Lambda from SQS

1. Navigate to the SQS console and choose Create New Queue.

2. Names for FIFO queues must end in .fifo. For Queue Name, enter myTestQueue.fifo and select FIFO Queue. Choose Quick-Create Queue.

Create New Queue

3. From the Lambda console, choose Create function.

4. Enter mySQStest for Function name, and leave the Node.js 12.x runtime selected. Open the Choose or create an execution role panel and select Create a new role from AWS policy templates.

5. Enter mySQStest for Role name and select Amazon SQS poller permissions from the Policy templates drop-down. Choose Create function.

Create Lambda function

6. Once the function is created, you can configure the trigger. Choose Add trigger.

Add trigger

7. In the Select a trigger drop-down, choose SQS. In the SQS queue drop-down, select the FIFO queue created earlier. Leave Batch size as the default value, then choose Add.

Add trigger dialog

8. After a few seconds, the SQS trigger is ready.

SQS FIFO trigger ready

The Lambda function is now invoked when new messages are available in the SQS FIFO queue.

Conclusion

Now, you can process messages from Amazon SQS FIFO queues with Lambda, helping bring scalable serverless processing to applications needing first-in, first-out message processing.

You can get started with SQS FIFO queue as a Lambda event source via AWS Management Console, AWS CLI, AWS SAM, AWS CloudFormation, or AWS SDK for Lambda. It is available in all AWS Regions where AWS Lambda is available. You pay only for the SQS API operations performed by the Lambda service on your behalf, and the Lambda requests and duration used to process your messages.

For more information on where AWS Lambda is available, see the AWS Region Table. To learn more, see Using AWS Lambda with Amazon SQS FIFO in the AWS Lambda Developer Guide.

 

 

Designing durable serverless apps with DLQs for Amazon SNS, Amazon SQS, AWS Lambda

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/designing-durable-serverless-apps-with-dlqs-for-amazon-sns-amazon-sqs-aws-lambda/

This post is courtesy of Otavio Ferreira, Sr Manager, SNS.

In a postal system, a dead-letter office is a facility for processing undeliverable mail. In pub/sub messaging, a dead-letter queue (DLQ) is a queue to which messages published to a topic can be sent, in case those messages cannot be delivered to a subscribed endpoint.

Amazon SNS supports DLQs, making your applications more resilient and durable upon delivery failure modes.

Understanding message delivery failures and retries

The delivery of a message fails when it’s not possible for Amazon SNS to access the subscribed endpoint. There are two reasons why this might happen:

  • Client errors, where the client is SNS (the message sender).
  • Server errors, where the server is the system that hosts the subscription endpoint (the message receiver), such as Amazon SQS or AWS Lambda.

Client errors

Client errors happen when SNS has stale subscription metadata. One common cause of client errors is when you (the endpoint owner) delete the endpoint. For example, you might delete the SQS queue that is subscribed to your SNS topic, without also deleting the SNS subscription corresponding to the queue. Another common cause is when you change the resource policy attached to your endpoint in a way that prevents SNS from delivering messages to that endpoint.

These errors are considered client errors because the client has attempted the delivery of a message to a destination that, from the client’s perspective, is no longer accessible. SNS does not retry the delivery of messages that failed as the result of client errors.

Server errors

Server errors happen when the system that powers the subscribed endpoint is unavailable, or when it returns an exception response indicating that it failed to process a valid request from SNS.

When server errors occur, SNS retries the failed deliveries according to a backoff function, which can be either linear or exponential. When a server error occurs for an AWS managed endpoint, backed by either SQS or Lambda, then SNS retries the delivery for up to 100,015 times, over 23 days.

Server errors can also happen with customer managed endpoints, namely HTTP, SMS, email, and mobile push endpoints. SNS also retries the delivery for these types of endpoints. HTTP endpoints support customer-defined retry policies, while SNS sets an internal delivery retry policy for SMS, email, and mobile push endpoints to 50 times, over 6 hours.

Delivery retries

SNS may receive a client error, or continue to receive a server error for a message beyond the number of retries defined by the corresponding retry policy. In that event, SNS discards the message. Setting a DLQ to your SNS subscription enables you to keep this message, regardless of the type of error, either client or server. DLQs give you more control over messages that cannot be delivered.

For more information on the delivery retry policy for each delivery protocol supported by SNS, see Amazon SNS Message Delivery Retry.

Using DLQs for AWS services

SNS, SQS, and Lambda support DLQs, addressing different failure modes. All DLQs are regular queues powered by SQS.

In SNS, DLQs store the messages that failed to be delivered to subscribed endpoints. For more information, see Amazon SNS Dead-Letter Queues.

In SQS, DLQs store the messages that failed to be processed by your consumer application. This failure mode can happen when producers and consumers fail to interpret aspects of the protocol that they use to communicate. In that case, the consumer receives the message from the queue, but fails to process it, as the message doesn’t have the structure or content that the consumer expects. The consumer can’t delete the message from the queue either. After exhausting the receive count in the redrive policy, SQS can sideline the message to the DLQ. For more information, see Amazon SQS Dead-Letter Queues.

In Lambda, DLQs store the messages that resulted in failed asynchronous executions of your Lambda function. An execution can result in an error for several reasons. Your code might raise an exception, time out, or run out of memory. The runtime executing your code might encounter an error and stop. Your function might hit its concurrency limit and be throttled. Regardless of the error type, when the error occurs, your code might have run completely, partially, or not at all. By default, Lambda retries an asynchronous execution twice. After exhausting the retries, Lambda can sideline the message to the DQL. For more information, see AWS Lambda Dead-Letter Queues.

When you have a fan-out architecture, with SQS queues and Lambda functions subscribed to an SNS topic, we recommend that you set DLQs to your SNS subscriptions, and to your destination queues and functions as well. This approach gives your application resilience against message delivery failures, message processing failures, and function execution failures too.

Applying DLQs in a use case

Here’s how everything comes together. The following diagram shows a serverless backend architecture that supports a car rental application. This is a durable serverless architecture based on DLQs for SNS, SQS, and Lambda.

Dead Letter Queue - DLQ SNS use case with architecture diagram

When a customer places an order to rent a car, the application sends that request to an API, which is powered by Amazon API Gateway. The REST API is backed by an SNS topic named Rental-Orders, and deployed onto an Amazon VPC subnet. The topic then fans out that order to the following two subscribed endpoints, for parallel processing:

  • An SQS queue, named Rental-Fulfilment, which feeds the integration with an internal fulfilment system hosted on Amazon EC2.
  • A Lambda function, named Rental-Billing, which processes and loads the customer order into a third-party billing system, also hosted on Amazon EC2.

To increase the durability of this serverless backend API, the following DLQs have been set up:

  • Two SNS DLQs, namely Rental-Fulfilment-Fanout-DLQ and Rental-Billing-Fanout-DLQ, which store the order in case either the subscribed SQS queue or Lambda function ever becomes unreachable.
  • An SQS DLQ, named Rental-Fulfilment-DLQ, which stores the order when the fulfilment system fails to process the order.
  • A Lambda DLQ, named Rental-Billing-DLQ, which stores the order when the function fails to process and load the order into the billing system.

When the DLQ captures the message, you can inspect the message for troubleshooting purposes. After you address the error at hand, you can poll the DLQ to retry the processing of the message.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or AWS CloudFormation. You can use the SDK, CLI, and API for polling the DLQs as well.

Configuring DLQs for subscriptions

You can attach a DLQ to an SNS subscription by setting the subscription’s RedrivePolicy parameter. The policy is a JSON object that refers to the DLQ ARN. The ARN must point to an SQS queue in the same AWS account as that of the SNS subscription. Also, both the DLQ and the subscription must be in the same AWS Region.

Here’s how you can configure one of the SNS DLQs applied in the car rental application example, presented earlier.

The following JSON object is a CloudFormation template that subscribes the SQS queue Rental-Fulfilment to the SNS topic Rental-Orders. The template also sets a RedrivePolicy that targets Rental-Fulfilment-Fanout-DLQ as a DLQ.

Lastly, the template sets a FilterPolicy value. It makes SNS deliver a message to the subscribed queue only if the published message carries an attribute named order-status with value set to either confirmed or canceled. As Amazon SNS Message Filtering happens before message delivery, messages that are filtered out aren’t sent to that subscription’s DLQ.

Internally, the CloudFormation template uses the SNS Subscribe API action for deploying the subscription and setting both policies, all part of the same API request.

{  
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
            "TopicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
            "RedrivePolicy": {
               "deadLetterTargetArn": 
                  "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"
            },
            "FilterPolicy": { 
               "order-status": [ "confirmed", "canceled" ]
            }
         }
      }
   }
}

Maybe the SNS topic and subscription are already deployed. In that case, you can use the SNS SetSubscriptionAttributes API action to set the RedrivePolicy, as shown by the following code examples, based on the AWS CLI and the AWS SDK for Java.

$ aws sns set-subscription-attributes 
   --region us-east-1
   --subscription-arn arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2
   --attribute-name RedrivePolicy 
   --attribute-value '{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"}'
AmazonSNS sns = AmazonSNSClientBuilder.defaultClient();

String subscriptionArn = "arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2";

String redrivePolicy = "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}";

SetSubscriptionAttributesRequest request = new SetSubscriptionAttributesRequest(
  subscriptionArn, 
  "RedrivePolicy", 
  redrivePolicy
);

sns.setSubscriptionAttributes(request);

Monitoring DLQs

You can use Amazon CloudWatch metrics and alarms to monitor the DLQs associated with your SNS subscriptions. In the car rental example, you can monitor the DLQs to be notified when the API failed to distribute any car rental order to the fulfillment or billing systems.

As regular SQS queues, the DLQs in SNS emit a number of metrics to CloudWatch, in 5-minute data points, such as NumberOfMessagesSent, NumberOfMessagesReceived and NumberOfMessagesDeleted. You can use these SQS metrics to be notified upon activity in your DLQs in SNS, so you may trigger a message recovery protocol.

You might have a case where you expect the DLQ to be always empty. In that case, create an CloudWatch alarm on NumberOfMessagesSent, set the alarm threshold to zero, and provide a separate SNS topic to be notified when the alarm goes off. The SNS topic, in its turn, can delivery your alarm notification to any endpoint type that you choose, such as email address, phone number, or mobile pager app.

Additionally, SNS itself provides its own set of metrics that are relevant to DLQs. Specifically, SNS metrics include the following:

  • NumberOfNotificationsRedrivenToDlq – Used when sending the message to the DLQ succeeds.
  • NumberOfNotificationsFailedToRedriveToDlq – Used when sending the message to the DLQ fails. This can happen because the DLQ either doesn’t exist anymore or doesn’t have the required access permissions to allow SNS to send messages to it. For more information about setting up the required access policy, see Giving Permissions for Amazon SNS to Send Messages to Amazon SQS.

Debugging with DLQs

Use CloudWatch Logs to see the exceptions that caused your SNS deliveries to fail and your messages to be sidelined to DLQs. In the car rental example, you can inspect the rental orders in the DLQs, as well as the logs associated with these queues. Then you can understand why those orders failed to be fanned out to the fulfilment or billing systems.

SNS can log both successful and failed deliveries in CloudWatch. You can enable Amazon SNS Delivery Status Logging by setting three SNS topic attributes, which are delivery protocol-specific. As an example, for SNS deliveries to SQS queues, you must set the following topic attributes: SQSSuccessFeedbackRoleArn,  SQSFailureFeedbackRoleArn, and SQSSuccessFeedbackSampleRate.

The following JSON object represents a successful SNS delivery in an CloudWatch Logs entry. The status code logged is 200 (SUCCESS). The attribute RedrivePolicy shows that the SNS subscription in question had its DLQ set.

{
  "notification": {
    "messageMD5Sum": "7bb3327ac55e49485bad42e159ca4d4b",
    "messageId": "e8c2bb09-235c-5f5d-b583-efd8df0f7d74",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:13:55.876"
  },
  "delivery": {
    "deliveryId": "6adf232e-fb12-5062-a564-27ff3741051f",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"sqsRequestId\":\"b2608a46-ccc4-51cc-003d-de972097debc\",\"sqsMessageId\":\"05fecd22-60a1-4d7d-bb79-026d49700b5a\"}",
    "dwellTimeMs": 58,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

The following JSON object represents a failed SNS delivery in CloudWatch Logs. In the following code example, the subscribed queue doesn’t exist. As a client error, the status code logged is 400 (FAILURE). Again, the RedrivePolicy attribute refers to a DLQ.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "9959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:51.116"
  },
  "delivery": {
    "deliveryId": "be743821-4c2c-5acc-a586-6cf0807f6fb1",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"ErrorCode\":\"AWS.SimpleQueueService.NonExistentQueue\", \"ErrorMessage\":\"The specified queue does not exist or you do not have access to it.\",\"sqsRequestId\":\"Unrecoverable\"}",
    "dwellTimeMs": 53,
    "attempts": 1,
    "statusCode": 400
  },
  "status": "FAILURE"
}

When the message delivery fails and there is a DLQ attached to the subscription, the message is sent to the DLQ and an additional entry is logged in CloudWatch. This new entry is specific to the delivery to the DLQ and refers to the DLQ ARN as the destination, as shown in the following JSON object.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "8959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:52.876"
  },
  "delivery": {
    "deliveryId": "a877c79f-a3ee-5105-9bbd-92596eae0232",
    "destination":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ",
    "providerResponse": "{\"sqsRequestId\":\"8cef1af5-e86a-519e-ad36-4f33252aa5ec\",\"sqsMessageId\":\"2b742c5c-0750-4ec5-a717-b95897adda8e\"}",
    "dwellTimeMs": 51,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

By analyzing Amazon CloudWatch Logs entries, you can understand why an SNS message was moved to a DLQ, and then take the required set of steps to recover the message. When you enable delivery status logging in SNS, you can configure the sample rate in which deliveries are logged, from 0% to 100%.

Encrypting DLQs

When your SNS subscription targets an SQS encrypted queue, then you probably want your DLQ to be an SQS encrypted queue as well. This configuration provides consistency in the form that your messages are encrypted at rest.

To follow this security recommendation, give the CMK you used to encrypt your DLQ a key policy that grants the SNS service principal access to AWS KMS API actions. For example, see the following sample key policy:

{
    "Sid": "GrantSnsAccessToKms",
    "Effect": "Allow",
    "Principal": { "Service": "sns.amazonaws.com" },
    "Action": [ "kms:Decrypt", "kms:GenerateDataKey*" ],
    "Resource": "*"
}

If you have an SNS encrypted topic, but a subscription in this topic points to a DLQ that isn’t an SQS encrypted queue, then messages sidelined to the DLQ aren’t encrypted at rest.

For more information, see Enabling Server-Side Encryption (SSE) for an Amazon SNS Topic with an Amazon SQS Encrypted Queue Subscribed.

Summary

DLQs for SNS, SQS, and Lambda increase the resiliency and durability of your applications. These DLQs address different failure modes, and can be used together.

  • SNS DLQs store messages that failed to be delivered to subscribed endpoints.
  • SQS DLQs store messages that the consumer system failed to process.
  • Lambda DLQs store the messages that resulted in failed asynchronous executions of your functions.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or CloudFormation. DLQs are available in all AWS Regions. Start today by running the tutorials:

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/simple-two-way-messaging-using-the-amazon-sqs-temporary-queue-client/

This post is contributed by Robin Salkeld, Sr. Software Development Engineer

Amazon SQS is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Asynchronous workflows have always been the primary use case for SQS. Using queues ensures one component can keep running smoothly without losing data when another component is unavailable or slow.

We were surprised, then, to discover that many customers use SQS in synchronous workflows. For example, many applications use queues to communicate between frontends and backends when processing a login request from a user.

Why would anyone use SQS for this? The service stores messages for up to 14 days with high durability, but messages in a synchronous workflow often must be processed within a few minutes, or even seconds. Why not just set up an HTTPS endpoint?

The more we talked to customers, the more we understood. Here’s what we learned:

  • Creating a queue is often easier and faster than creating an HTTPS endpoint and the infrastructure necessary to ensure the endpoint’s scalability.
  • Queues are safe by default because they are locked down to the AWS account that created them. In addition, any DDoS attempt on your service is absorbed by SQS instead of loading down your own servers.
  • There is generally no need to create firewall rules for the communication between microservices if they use queues.
  • Although SQS provides durable storage (which isn’t necessary for short-lived messages), it is still a cost-effective solution for this use case. This is especially true when you consider all the messaging broker maintenance that is done for you.

However, setting up efficient two-way communication through one-way queues requires some non-trivial client-side code. In our previous two-part post series on implementing enterprise integration patterns with AWS messaging services, Point-to-point channels and Publish-subscribe channels, we discussed the Request-Response Messaging Pattern. In this pattern, each requester creates a temporary destination to receive each response message.

The simplest approach is to create a new queue for each response, but this is like building a road just so a single car can drive on it before tearing it down. Technically, this can work (and SQS can create and delete queues quickly), but we can definitely make it faster and cheaper.

To better support short-lived, lightweight messaging destinations, we are pleased to present the Amazon SQS Temporary Queue Client. This client makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill.

Virtual queues

The key concept behind the client is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS and no costs associated with creating a virtual queue.

The Temporary Queue Client includes the AmazonSQSVirtualQueuesClient class for creating and managing virtual queues. This class implements the AmazonSQS interface and adds support for attributes related to virtual queues. You can create a virtual queue using this client by calling the CreateQueue API action and including the HostQueueURL queue attribute. This attribute specifies the existing SQS queue on which to host the virtual queue. The queue URL for a virtual queue is in the form <host queue URL>#<virtual queue name>. For example:

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

When you call the SendMessage or SendMessageBatch API actions on AmazonSQSVirtualQueuesClient with a virtual queue URL, the client first extracts the virtual queue name. It then attaches this name as an additional message attribute to each message, and sends the messages to the host queue. When you call the ReceiveMessage API action on a virtual queue, the calling thread waits for messages to appear in the in-memory buffer for the virtual queue. Meanwhile, a background thread polls the host queue and dispatches messages to these buffers according to the additional message attribute.

This mechanism is similar to how the AmazonSQSBufferedAsyncClient prefetches messages, and the benefits are similar. A single call to SQS can provide messages for up to 10 virtual queues, reducing the API calls that you pay for by up to a factor of ten. Deleting a virtual queue simply removes the client-side resources used to implement them, again without making API calls to SQS.

The diagram below illustrates the end-to-end process for sending messages through virtual queues:

Sending messages through virtual queues

Virtual queues are similar to virtual machines. Just as a virtual machine provides the same experience as a physical machine, a virtual queue divides the resources of a single SQS queue into smaller logical queues. This is ideal for temporary queues, since they frequently only receive a handful of messages in their lifetime. Virtual queues are currently implemented entirely within the Temporary Queue Client, but additional support and optimizations might be added to SQS itself in the future.

In most cases, you don’t have to manage virtual queues yourself. The library also includes the AmazonSQSTemporaryQueuesClient class. This class automatically creates virtual queues when the CreateQueue API action is called and creates host queues on demand for all queues with the same queue attributes. To optimize existing application code that creates and deletes queues, you can use this class as a drop-in replacement implementation of the AmazonSQS interface.

The client also includes the AmazonSQSRequester and AmazonSQSResponder interfaces, which enable two-way communication through SQS queues. The following is an example of an RPC implementation for a web application’s login process.

/**
 * This class handles a user's login request on the client side.
 */
public class LoginClient {

    // The SQS queue to send the requests to.
    private final String requestQueueUrl;

    // The AmazonSQSRequester creates a temporary queue for each response.
    private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient();

    private final LoginClient(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Send a login request to the server.
     */
    public String login(String body) throws TimeoutException {
        SendMessageRequest request = new SendMessageRequest()
                .withMessageBody(body)
                .withQueueUrl(requestQueueUrl);

        // This:
        //  - creates a temporary queue
        //  - attaches its URL as an attribute on the message
        //  - sends the message
        //  - receives the response from the temporary queue
        //  - deletes the temporary queue
        //  - returns the response
        //
        // If something goes wrong and the server's response never shows up, this method throws a
        // TimeoutException.
        Message response = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS);
        
        return response.getBody();
    }
}

/**
 * This class processes users' login requests on the server side.
 */
public class LoginServer {

    // The SQS queue to poll for login requests.
    // Assume that on construction a thread is created to poll this queue and call
    // handleLoginRequest() below for each message.
    private final String requestQueueUrl;

    // The AmazonSQSResponder sends responses to the correct response destination.
    private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient();

    private final AmazonSQS(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Handle a login request sent from the client above.
     */
    public void handleLoginRequest(Message message) {
        // Assume doLogin does the actual work, and returns a serialized result
        String response = doLogin(message.getBody());

        // This extracts the URL of the temporary queue from the message attribute and sends the
        // response to that queue.
        sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response));  
    }
}

Cleaning up

As with any other AWS SDK client, your code should call the shutdown() method when the temporary queue client is no longer needed. The AmazonSQSRequester interface also provides a shutdown() method, which shuts down its internal temporary queue client. This ensures that the in-memory resources needed for any virtual queues are reclaimed, and that the host queue that the client automatically created is also deleted automatically.

However, in the world of distributed systems things are a little more complex. Processes can run out of memory and crash, and hosts can reboot suddenly and unexpectedly. There are even cases where you don’t have the opportunity to run custom code on shutdown.

The Temporary Queue Client client addresses this issue as well. For each host queue with recent API calls, the client periodically uses the TagQueue API action to attach a fresh tag value that indicates the queue is still being used. The tagging process serves as a heartbeat to keep the queue alive. According to a configurable time period (by default, 5 minutes), a background thread uses the ListQueues API action to obtain the URLs of all queues with the configured prefix. Then, it deletes each queue that has not been tagged recently. The mechanism is similar to how the Amazon DynamoDB Lock Client expires stale lock leases.

If you use the AmazonSQSTemporaryQueuesClient directly, you can customize how long queues must be idle before they is deleted by configuring the IdleQueueRetentionPeriodSeconds queue attribute. The client supports setting this attribute on both host queues and virtual queues. For virtual queues, setting this attribute ensures that the in-memory resources do not become a memory leak in the presence of application bugs.

Any API call to a queue marks it as non-idle, including ReceiveMessage calls that don’t return any messages. The only reason to increase the retention period attribute is to give the client more time when it can’t send heartbeats—for example, due to garbage collection pauses or networking issues.

But what if you want to use this client in a fleet of a thousand EC2 instances? Won’t every client spend a lot of time checking every queue for idleness? Doesn’t that imply duplicate work that increases as the fleet is scaled up?

We thought of this too. The Temporary Queue Client creates a shared queue for all clients using the same queue prefix, and uses this queue as a work queue for the distributed task. Instead of every client calling the ListQueues API action every five minutes, a new seed message (which triggers the sweeping process) is sent to this queue every five minutes.

When one of the clients receives this message, it calls the ListQueues API action and sends each queue URL in the result as another kind of message to the same shared work queue. The work of actually checking each queue for idleness is distributed roughly evenly to the active clients, ensuring scalability. There is even a mechanism that works around the fact that the ListQueues API action currently only returns no more than 1,000 queue URLs at time.

Summary

We are excited about how the new Amazon SQS Temporary Queue Client makes more messaging patterns easier and cheaper for you. Download the code from GitHub, have a look at Temporary Queues in the Amazon SQS Developer Guide, try out the client, and let us know what you think!

ICYMI: Serverless Q4 2018

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2018/

This post is courtesy of Eric Johnson, Senior Developer Advocate – AWS Serverless

Welcome to the fourth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

This edition of ICYMI includes all announcements from AWS re:Invent 2018!

If you didn’t see them, check our Q1 ICYMIQ2 ICYMI, and Q3 ICYMI posts for what happened then.

So, what might you have missed this past quarter? Here’s the recap.

New features

AWS Lambda introduced the Lambda runtime API and Lambda layers, which enable developers to bring their own runtime and share common code across Lambda functions. With the release of the runtime API, we can now support runtimes from AWS partners such as the PHP runtime from Stackery and the Erlang and Elixir runtimes from Alert Logic. Using layers, partners such as Datadog and Twistlock have also simplified the process of using their Lambda code libraries.

To meet the demand of larger Lambda payloads, Lambda doubled the payload size of asynchronous calls to 256 KB.

In early October, Lambda also lengthened the runtime limit by enabling Lambda functions that can run up to 15 minutes.

Lambda also rolled out native support for Ruby 2.5 and Python 3.7.

You can now process Amazon Kinesis streams up to 68% faster with AWS Lambda support for Kinesis Data Streams enhanced fan-out and HTTP/2 for faster streaming.

Lambda also released a new Application view in the console. It’s a high-level view of all of the resources in your application. It also gives you a quick view of deployment status with the ability to view service metrics and custom dashboards.

Application Load Balancers added support for targeting Lambda functions. ALBs can provide a simple HTTP/S front end to Lambda functions. ALB features such as host- and path-based routing are supported to allow flexibility in triggering Lambda functions.

Amazon API Gateway added support for AWS WAF. You can use AWS WAF for your Amazon API Gateway APIs to protect from attacks such as SQL injection and cross-site scripting (XSS).

API Gateway has also improved parameter support by adding support for multi-value parameters. You can now pass multiple values for the same key in the header and query string when calling the API. Returning multiple headers with the same name in the API response is also supported. For example, you can send multiple Set-Cookie headers.

In October, API Gateway relaunched the Serverless Developer Portal. It provides a catalog of published APIs and associated documentation that enable self-service discovery and onboarding. You can customize it for branding through either custom domain names or logo/styling updates. In November, we made it easier to launch the developer portal from the Serverless Application Repository.

In the continuous effort to decrease customer costs, API Gateway introduced tiered pricing. The tiered pricing model allows the cost of API Gateway at scale with an API Requests price as low as $1.51 per million requests at the highest tier.

Last but definitely not least, API Gateway released support for WebSocket APIs in mid-December as a final holiday gift. With this new feature, developers can build bidirectional communication applications without having to provision and manage any servers. This has been a long-awaited and highly anticipated announcement for the serverless community.

AWS Step Functions added eight new service integrations. With this release, the steps of your workflow can exist on Amazon ECS, AWS Fargate, Amazon DynamoDB, Amazon SNS, Amazon SQS, AWS Batch, AWS Glue, and Amazon SageMaker. This is in addition to the services that Step Functions already supports: AWS Lambda and Amazon EC2.

Step Functions expanded by announcing availability in the EU (Paris) and South America (São Paulo) Regions.

The AWS Serverless Application Repository increased its functionality by supporting more resources in the repository. The Serverless Application Repository now supports Application Auto Scaling, Amazon Athena, AWS AppSync, AWS Certificate Manager, Amazon CloudFront, AWS CodeBuild, AWS CodePipeline, AWS Glue, AWS dentity and Access Management, Amazon SNS, Amazon SQS, AWS Systems Manager, and AWS Step Functions.

The Serverless Application Repository also released support for nested applications. Nested applications enable you to build highly sophisticated serverless architectures by reusing services that are independently authored and maintained but easily composed using AWS SAM and the Serverless Application Repository.

AWS SAM made authorization simpler by introducing SAM support for authorizers. Enabling authorization for your APIs is as simple as defining an Amazon Cognito user pool or an API Gateway Lambda authorizer as a property of your API in your SAM template.

AWS SAM CLI introduced two new commands. First, you can now build locally with the sam build command. This functionality allows you to compile deployment artifacts for Lambda functions written in Python. Second, the sam publish command allows you to publish your SAM application to the Serverless Application Repository.

Our SAM tooling team also released the AWS Toolkit for PyCharm, which provides an integrated experience for developing serverless applications in Python.

The AWS Toolkits for Visual Studio Code (Developer Preview) and IntelliJ (Developer Preview) are still in active development and will include similar features when they become generally available.

AWS SAM and the AWS SAM CLI implemented support for Lambda layers. Using a SAM template, you can manage your layers, and using the AWS SAM CLI, you can develop and debug Lambda functions that are dependent on layers.

Amazon DynamoDB added support for transactions, allowing developer to enforce all-or-nothing operations. In addition to transaction support, Amazon DynamoDB Accelerator also added support for DynamoDB transactions.

Amazon DynamoDB also announced Amazon DynamoDB on-demand, a flexible new billing option for DynamoDB capable of serving thousands of requests per second without capacity planning. DynamoDB on-demand offers simple pay-per-request pricing for read and write requests so that you only pay for what you use, making it easy to balance costs and performance.

AWS Amplify released the Amplify Console, which is a continuous deployment and hosting service for modern web applications with serverless backends. Modern web applications include single-page app frameworks such as React, Angular, and Vue and static-site generators such as Jekyll, Hugo, and Gatsby.

Amazon SQS announced support for Amazon VPC Endpoints using PrivateLink.

Serverless blogs

October

November

December

Tech talks

We hold several Serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. Here are the three tech talks that we delivered in Q4:

Twitch

We’ve been so busy livestreaming on Twitch that you’re most certainly missing out if you aren’t following along!

For information about upcoming broadcasts and recent livestreams, keep an eye on AWS on Twitch for more Serverless videos and on the Join us on Twitch AWS page.

New Home for SAM Docs

This quarter, we moved all SAM docs to https://docs.aws.amazon.com/serverless-application-model. Everything you need to know about SAM is there. If you don’t find what you’re looking for, let us know!

In other news

The schedule is out for 2019 AWS Global Summits in cities around the world. AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Summits are held in major cities around the world. They attract technologists from all industries and skill levels who want to discover how AWS can help them innovate quickly and deliver flexible, reliable solutions at scale. Get notified when to register and learn more at the AWS Global Summits website.

Still looking for more?

The Serverless landing page has lots of information. The resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. Check it out!

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Monitoring your Amazon SNS message filtering activity with Amazon CloudWatch

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/monitoring-your-amazon-sns-message-filtering-activity-with-amazon-cloudwatch/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS message filtering provides a set of string and numeric matching operators that allow each subscription to receive only the messages of interest. Hence, SNS message filtering can simplify your pub/sub messaging architecture by offloading the message filtering logic from your subscriber systems, as well as the message routing logic from your publisher systems.

After you set the subscription attribute that defines a filter policy, the subscribing endpoint receives only the messages that carry attributes matching this filter policy. Other messages published to the topic are filtered out for this subscription. In this way, the native integration between SNS and Amazon CloudWatch provides visibility into the number of messages delivered, as well as the number of messages filtered out.

CloudWatch metrics are captured automatically for you. To get started with SNS message filtering, see Filtering Messages with Amazon SNS.

Message Filtering Metrics

The following six CloudWatch metrics are relevant to understanding your SNS message filtering activity:

  • NumberOfMessagesPublished – Inbound traffic to SNS. This metric tracks all the messages that have been published to the topic.
  • NumberOfNotificationsDelivered – Outbound traffic from SNS. This metric tracks all the messages that have been successfully delivered to endpoints subscribed to the topic. A delivery takes place either when the incoming message attributes match a subscription filter policy, or when the subscription has no filter policy at all, which results in a catch-all behavior.
  • NumberOfNotificationsFilteredOut – This metric tracks all the messages that were filtered out because they carried attributes that didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-NoMessageAttributes – This metric tracks all the messages that were filtered out because they didn’t carry any attributes at all and, consequently, didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-InvalidAttributes – This metric keeps track of messages that were filtered out because they carried invalid or malformed attributes and, thus, didn’t match the subscription filter policy.
  • NumberOfNotificationsFailed – This last metric tracks all the messages that failed to be delivered to subscribing endpoints, regardless of whether a filter policy had been set for the endpoint. This metric is emitted after the message delivery retry policy is exhausted, and SNS stops attempting to deliver the message. At that moment, the subscribing endpoint is likely no longer reachable. For example, the subscribing SQS queue or Lambda function has been deleted by its owner. You may want to closely monitor this metric to address message delivery issues quickly.

Message filtering graphs

Through the AWS Management Console, you can compose graphs to display your SNS message filtering activity. The graph shows the number of messages published, delivered, and filtered out within the timeframe you specify (1h, 3h, 12h, 1d, 3d, 1w, or custom).

SNS message filtering for CloudWatch Metrics

To compose an SNS message filtering graph with CloudWatch:

  1. Open the CloudWatch console.
  2. Choose Metrics, SNS, All Metrics, and Topic Metrics.
  3. Select all metrics to add to the graph, such as:
    • NumberOfMessagesPublished
    • NumberOfNotificationsDelivered
    • NumberOfNotificationsFilteredOut
  4. Choose Graphed metrics.
  5. In the Statistic column, switch from Average to Sum.
  6. Title your graph with a descriptive name, such as “SNS Message Filtering”

After you have your graph set up, you may want to copy the graph link for bookmarking, emailing, or sharing with co-workers. You may also want to add your graph to a CloudWatch dashboard for easy access in the future. Both actions are available to you on the Actions menu, which is found above the graph.

Summary

SNS message filtering defines how SNS topics behave in terms of message delivery. By using CloudWatch metrics, you gain visibility into the number of messages published, delivered, and filtered out. This enables you to validate the operation of filter policies and more easily troubleshoot during development phases.

SNS message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). CloudWatch metrics for SNS message filtering is available now, in all AWS Regions.

For information about pricing, see the CloudWatch pricing page.

For more information, see:

Solving Complex Ordering Challenges with Amazon SQS FIFO Queues

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/solving-complex-ordering-challenges-with-amazon-sqs-fifo-queues/

Contributed by Shea Lutton, AWS Cloud Infrastructure Architect

Amazon Simple Queue Service (Amazon SQS) is a fully managed queuing service that helps decouple applications, distributed systems, and microservices to increase fault tolerance. SQS queues come in two distinct types:

  • Standard SQS queues are able to scale to enormous throughput with at-least-once delivery.
  • FIFO queues are designed to guarantee that messages are processed exactly once in the exact order that they are received and have a default rate of 300 transactions per second.

As customers explore SQS FIFO queues, they often have questions about how the behavior works when messages arrive and are consumed. This post walks through some common situations to identify the exact behavior that you can expect. It also covers the behavior of message groups in depth and explains why message groups are key to understanding how FIFO queues work.

The simple case

Suppose that you run a major auction platform where people buy and sell a wide range of products. Your platform requires that transactions from buyers and sellers get processed in exactly the order received. Here’s how a FIFO queue helps you keep all your transactions in one straight flow.

A seller currently is holding an auction for a laptop, and three different bids are received for the same price. Ties are awarded to the first bidder at that price so it is important to track which arrived first. Your auction platform receives the three bids and sends them to a FIFO queue before they are processed.

Now observe how messages leave the queue. When your consumer asks for a batch of up to 10 messages, SQS starts filling the batch with the oldest message (bid A1). It keeps filling until either the batch is full or the queue is empty. In this case, the batch contains the three messages and the queue is now empty. After a batch has left the queue, SQS considers that batch of messages to be “in-flight” until the consumer either deletes them or the batch’s visibility timer expires.

 

When you have a single consumer, this is easy to envision. The consumer gets a batch of messages (now in-flight), does its processing, and deletes the messages. That consumer is then ready to ask for the next batch of messages.

The critical thing to keep in mind is that SQS won’t release the next batch of messages until the first batch has been deleted. By adding more messages to the queue, you can see more interesting behaviors. Imagine that a burst of 11 bids is sent to your FIFO queue, with two bids for Auction A arriving last.

The FIFO queue now has at least two batches of messages in it. When your single consumer requests the first batch of 10 messages, it receives a batch starting with B1 and ending with A1. Later, after the first batch has been deleted, the consumer can get the second batch of messages containing the final A2 message from the queue.

Adding complexity with multiple message groups

A new challenge arises. Your auction platform is getting busier and your dev team added a number of new features. The combination of increased messages and extra processing time for the new features means that a single consumer is too slow. The solution is to scale to have more consumers and process messages in parallel.

To work in parallel, your team realized that only the messages related to a single auction must be kept in order. All transactions for Auction A need to be kept in order and so do all transactions for Auction B. But the two auctions are independent and it does not matter which auctions transactions are processed first.

FIFO can handle that case with a feature called message groups. Each transaction related to Auction A is placed by your producer into message group A, and so on. In the diagram below, Auction A and Auction B each received three bid transactions, with bid B1 arriving first. The FIFO queue always keeps transactions within a message group in the order in which they arrived.

How is this any different than earlier examples? The consumer now gets the messages ordered by message groups, all the B group messages followed by all the A group messages. Multiple message groups create the possibility of using multiple consumers, which I explain in a moment. If FIFO can’t fill up a batch of messages with a single message group, FIFO can place more than one message group in a batch of messages. But whenever possible, the queue gives you a full batch of messages from the same group.

The order of messages leaving a FIFO queue is governed by three rules:

  1. Return the oldest message where no other message in the same message group is currently in-flight.
  2. Return as many messages from the same message group as possible.
  3. If a message batch is still not full, go back to rule 1.

To see this behavior, add a second consumer and insert many more messages into the queue. For simplicity, the delete message action has been omitted in these diagrams but it is assumed that all messages in a batch are processed successfully by the consumer and the batch is properly deleted immediately after.

In this example, there are 11 Group A and 11 Group B transactions arriving in interleaved order and a second consumer has been added. Consumer 1 asks for a group of 10 messages and receives 10 Group A messages. Consumer 2 then asks for 10 messages but SQS knows that Group A is in flight, so it releases 10 Group B messages. The two consumers are now processing two batches of messages in parallel, speeding up throughput and then deleting their batches. When Consumer 1 requests the next batch of messages, it receives the remaining two messages, one from Group A and one from Group B.

Consider this nuanced detail from the example above. What would happen if Consumer 1 was on a faster server and processed its first batch of messages before Consumer 2 could mark its messages for deletion? See if you can predict the behavior before looking at the answer.

If Consumer 2 has not deleted its Group B messages yet when Consumer 1 asks for the next batch, then the FIFO queue considers Group B to still be in flight. It does not release any more Group B messages. Consumer 1 gets only the remaining Group A message. Later, after Consumer 2 has deleted its first batch, the remaining Group B message is released.

Conclusion

I hope this post answered your questions about how Amazon SQS FIFO queues work and why message groups are helpful. If you’re interested in exploring SQS FIFO queues further, here are a few ideas to get you started:

Message Filtering Operators for Numeric Matching, Prefix Matching, and Blacklisting in Amazon SNS

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/message-filtering-operators-for-numeric-matching-prefix-matching-and-blacklisting-in-amazon-sns/

This blog was contributed by Otavio Ferreira, Software Development Manager for Amazon SNS

Message filtering simplifies the overall pub/sub messaging architecture by offloading message filtering logic from subscribers, as well as message routing logic from publishers. The initial launch of message filtering provided a basic operator that was based on exact string comparison. For more information, see Simplify Your Pub/Sub Messaging with Amazon SNS Message Filtering.

Today, AWS is announcing an additional set of filtering operators that bring even more power and flexibility to your pub/sub messaging use cases.

Message filtering operators

Amazon SNS now supports both numeric and string matching. Specifically, string matching operators allow for exact, prefix, and “anything-but” comparisons, while numeric matching operators allow for exact and range comparisons, as outlined below. Numeric matching operators work for values between -10e9 and +10e9 inclusive, with five digits of accuracy right of the decimal point.

  • Exact matching on string values (Whitelisting): Subscription filter policy   {"sport": ["rugby"]} matches message attribute {"sport": "rugby"} only.
  • Anything-but matching on string values (Blacklisting): Subscription filter policy {"sport": [{"anything-but": "rugby"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"} and {"sport": "football"} but not {"sport": "rugby"}
  • Prefix matching on string values: Subscription filter policy {"sport": [{"prefix": "bas"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"}
  • Exact matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["=", 301.5]}]} matches message attributes {"balance": 301.500} and {"balance": 3.015e2}
  • Range matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["<", 0]}]} matches negative numbers only, and {"balance": [{"numeric": [">", 0, "<=", 150]}]} matches any positive number up to 150.

As usual, you may apply the “AND” logic by appending multiple keys in the subscription filter policy, and the “OR” logic by appending multiple values for the same key, as follows:

  • AND logic: Subscription filter policy {"sport": ["rugby"], "language": ["English"]} matches only messages that carry both attributes {"sport": "rugby"} and {"language": "English"}
  • OR logic: Subscription filter policy {"sport": ["rugby", "football"]} matches messages that carry either the attribute {"sport": "rugby"} or {"sport": "football"}

Message filtering operators in action

Here’s how this new set of filtering operators works. The following example is based on a pharmaceutical company that develops, produces, and markets a variety of prescription drugs, with research labs located in Asia Pacific and Europe. The company built an internal procurement system to manage the purchasing of lab supplies (for example, chemicals and utensils), office supplies (for example, paper, folders, and markers) and tech supplies (for example, laptops, monitors, and printers) from global suppliers.

This distributed system is composed of the four following subsystems:

  • A requisition system that presents the catalog of products from suppliers, and takes orders from buyers
  • An approval system for orders targeted to Asia Pacific labs
  • Another approval system for orders targeted to European labs
  • A fulfillment system that integrates with shipping partners

As shown in the following diagram, the company leverages AWS messaging services to integrate these distributed systems.

  • Firstly, an SNS topic named “Orders” was created to take all orders placed by buyers on the requisition system.
  • Secondly, two Amazon SQS queues, named “Lab-Orders-AP” and “Lab-Orders-EU” (for Asia Pacific and Europe respectively), were created to backlog orders that are up for review on the approval systems.
  • Lastly, an SQS queue named “Common-Orders” was created to backlog orders that aren’t related to lab supplies, which can already be picked up by shipping partners on the fulfillment system.

The company also uses AWS Lambda functions to automatically process lab supply orders that don’t require approval or which are invalid.

In this example, because different types of orders have been published to the SNS topic, the subscribing endpoints have had to set advanced filter policies on their SNS subscriptions, to have SNS automatically filter out orders they can’t deal with.

As depicted in the above diagram, the following five filter policies have been created:

  • The SNS subscription that points to the SQS queue “Lab-Orders-AP” sets a filter policy that matches lab supply orders, with a total value greater than $1,000, and that target Asia Pacific labs only. These more expensive transactions require an approver to review orders placed by buyers.
  • The SNS subscription that points to the SQS queue “Lab-Orders-EU” sets a filter policy that matches lab supply orders, also with a total value greater than $1,000, but that target European labs instead.
  • The SNS subscription that points to the Lambda function “Lab-Preapproved” sets a filter policy that only matches lab supply orders that aren’t as expensive, up to $1,000, regardless of their target lab location. These orders simply don’t require approval and can be automatically processed.
  • The SNS subscription that points to the Lambda function “Lab-Cancelled” sets a filter policy that only matches lab supply orders with total value of $0 (zero), regardless of their target lab location. These orders carry no actual items, obviously need neither approval nor fulfillment, and as such can be automatically canceled.
  • The SNS subscription that points to the SQS queue “Common-Orders” sets a filter policy that blacklists lab supply orders. Hence, this policy matches only office and tech supply orders, which have a more streamlined fulfillment process, and require no approval, regardless of price or target location.

After the company finished building this advanced pub/sub architecture, they were then able to launch their internal procurement system and allow buyers to begin placing orders. The diagram above shows six example orders published to the SNS topic. Each order contains message attributes that describe the order, and cause them to be filtered in a different manner, as follows:

  • Message #1 is a lab supply order, with a total value of $15,700 and targeting a research lab in Singapore. Because the value is greater than $1,000, and the location “Asia-Pacific-Southeast” matches the prefix “Asia-Pacific-“, this message matches the first SNS subscription and is delivered to SQS queue “Lab-Orders-AP”.
  • Message #2 is a lab supply order, with a total value of $1,833 and targeting a research lab in Ireland. Because the value is greater than $1,000, and the location “Europe-West” matches the prefix “Europe-“, this message matches the second SNS subscription and is delivered to SQS queue “Lab-Orders-EU”.
  • Message #3 is a lab supply order, with a total value of $415. Because the value is greater than $0 and less than $1,000, this message matches the third SNS subscription and is delivered to Lambda function “Lab-Preapproved”.
  • Message #4 is a lab supply order, but with a total value of $0. Therefore, it only matches the fourth SNS subscription, and is delivered to Lambda function “Lab-Cancelled”.
  • Messages #5 and #6 aren’t lab supply orders actually; one is an office supply order, and the other is a tech supply order. Therefore, they only match the fifth SNS subscription, and are both delivered to SQS queue “Common-Orders”.

Although each message only matched a single subscription, each was tested against the filter policy of every subscription in the topic. Hence, depending on which attributes are set on the incoming message, the message might actually match multiple subscriptions, and multiple deliveries will take place. Also, it is important to bear in mind that subscriptions with no filter policies catch every single message published to the topic, as a blank filter policy equates to a catch-all behavior.

Summary

Amazon SNS allows for both string and numeric filtering operators. As explained in this post, string operators allow for exact, prefix, and “anything-but” comparisons, while numeric operators allow for exact and range comparisons. These advanced filtering operators bring even more power and flexibility to your pub/sub messaging functionality and also allow you to simplify your architecture further by removing even more logic from your subscribers.

Message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). SNS filtering operators for numeric matching, prefix matching, and blacklisting are available now in all AWS Regions, for no extra charge.

To experiment with these new filtering operators yourself, and continue learning, try the 10-minute Tutorial Filter Messages Published to Topics. For more information, see Filtering Messages with Amazon SNS in the SNS documentation.