Tag Archives: Amazon SQS

ICYMI: Serverless pre:Invent 2019

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-serverless-preinvent-2019/

With Contributions from Chris Munns – Sr Manager – Developer Advocacy – AWS Serverless

The last two weeks have been a frenzy of AWS service and feature launches, building up to AWS re:Invent 2019. As there has been a lot announced we thought we’d ship an ICYMI post summarizing the serverless service specific features that have been announced. We’ve also dropped in some service announcements from services that are commonly used in serverless application architectures or development.

AWS re:Invent

AWS re:Invent 2019

We also want you to know that we’ll be talking about many of these features (as well as those coming) in sessions at re:Invent.

Here’s what’s new!

AWS Lambda

On September 3, AWS Lambda started rolling out a major improvement to how AWS Lambda functions work with your Amazon VPC networks. This change brings both scale and performance improvements, and addresses several of the limitations of the previous networking model with VPCs.

On November 25, Lambda announced that the rollout of this new capability has completed in 6 additional regions including US East (Virginia) and US West (Oregon).

New VPC to VPC NAT for Lambda functions

New VPC to VPC NAT for Lambda functions

On November 18, Lambda announced three new runtime updates. Lambda now supports Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new language features and benefits so be sure to check out the linked release posts. These new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda has released a number of controls for both stream and async based invocations:

  • For Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams, it’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities will help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

In addition to the above controls, Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set one destination for a success, and another for a failure. This unlocks really useful patterns for distributed event-based applications and can reduce code to send function results to a destination manually.

Lambda Destinations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Amazon Kinesis Data Streams and Amazon DynamoDB Streams. This allows for faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Lambda Parallelization Factor diagram

Lambda Parallelization Factor diagram

Lambda now supports Amazon SQS FIFO queues as an event source. FIFO queues guarantee the order of record processing compared to standard queues which are unordered. FIFO queues support messaging batching via a MessageGroupID attribute which allows for parallel Lambda consumers of a single FIFO queue. This allows for high throughput of record processing by Lambda.

Lambda now supports Environment Variables in AWS China (Beijing) Region, operated by Sinnet and the AWS China (Ningxia) Region, operated by NWCD.

Lastly, you can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics tell you the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, spot outliers, and find hard-to-spot situations that create a poor customer experience for a subset of your users.

AWS SAM CLI

AWS SAM CLI deploy command

AWS SAM CLI deploy command

The SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One of the powerful features of Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. Step Functions has expanded its integration with Amazon SageMaker to simplify machine learning workflows, and added a new integration with Amazon EMR, making it faster to build and easier to monitor EMR big data processing workflows.

Step Functions step with EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor and react to usage and spending trends on your AWS accounts.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

AWS Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates.

In US East (N. Virginia), US West (Oregon), and EU (Ireland), throughput has increased from 1,000 state transitions per second to 1,500 state transitions per second with bucket capacity of 5,000 state transitions. The default start rate for state machine executions has also increased from 200 per second to 300 per second, with bucket capacity of up to 1,300 starts in these regions.

In all other regions, throughput has increased from 400 state transitions per second to 500 state transitions per second with bucket capacity of 800 state transitions. The default start rate for AWS Step Functions state machine executions has also increased from 25 per second to 150 per second, with bucket capacity of up to 800 state machine executions.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

CloudWatch ServiceLens

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

On November 18, CloudWatch launched Embedded Metric Format to help you ingest complex high-cardinality application data in the form of logs and easily generate actionable metrics from them. You can publish these metrics from your Lambda function by using the PutLogEvents API or for Node.js or Python based applications using an open source library.

Lastly, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

X-Ray announced trace maps, which enable you to map the end to end path of a single request. Identifiers will show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. X-Ray supports tracing canary scripts throughout the application providing metrics on performance or application issues.

X-Ray Service map with CloudWatch Synthetics

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

DynamoDB announced support for customer managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Currently under preview, is another new DynamoDB capability to identify frequently accessed keys and database traffic trends. With this you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

CloudWatch Contributor Insights for DynamoDB

CloudWatch Contributor Insights for DynamoDB

Last but far from least for DynamoDB, is adaptive capacity, a feature which helps you handle imbalanced workloads by isolating frequently accessed items automatically and shifting data across partitions to rebalance them. This helps both reduce cost by enabling you to provision throughput for a more balanced out workload vs. over provisioning for uneven data access patterns.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge will appear next to your name in the SAR and will deep-link to your GitHub profile.

SAR Verified developer badges

SAR Verified developer badges

AWS Code Services

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

CodeBuild test trends

CodeBuild test trends

AWS Amplify and AWS AppSync

Instead of trying to summarize all the awesome things that our peers over in the Amplify and AppSync teams have done recently we’ll instead link you to their own recent summary: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

AWS AppSync

AWS AppSync

Still looking for more?

We only covered a small bit of all the awesome new things that were recently announced. Keep your eyes peeled for more exciting announcements next week during re:Invent and for a future ICYMI Serverless Q4 roundup. We’ll also be kicking off a fresh series of Tech Talks in 2020 with new content helping to dive deeper on everything new coming out of AWS for serverless application developers.

Decoupled Serverless Scheduler To Run HPC Applications At Scale on EC2

Post Syndicated from Emma White original https://aws.amazon.com/blogs/compute/decoupled-serverless-scheduler-to-run-hpc-applications-at-scale-on-ec2/

This post is written by Ludvig Nordstrom and Mark Duffield | on November 27, 2019

In this blog post, we dive in to a cloud native approach for running HPC applications at scale on EC2 Spot Instances, using a decoupled serverless scheduler. This architecture is ideal for many workloads in the HPC and EDA industries, and can be used for any batch job workload.

At the end of this blog post, you will have two takeaways.

  1. A highly scalable environment that can run on hundreds of thousands of cores across EC2 Spot Instances.
  2. A fully serverless architecture for job orchestration.

We discuss deploying and running a pre-built serverless job scheduler that can run both Windows and Linux applications using any executable file format for your application. This environment provides high performance, scalability, cost efficiency, and fault tolerance. We introduce best practices and benefits to creating this environment, and cover the architecture, running jobs, and integration in to existing environments.

quick note about the term cloud native: we use the term loosely in this blog. Here, cloud native  means we use AWS Services (to include serverless and microservices) to build out our compute environment, instead of a traditional lift-and-shift method.

Let’s get started!

 

Solution overview

This blog goes over the deployment process, which leverages AWS CloudFormation. This allows you to use infrastructure as code to automatically build out your environment. There are two parts to the solution: the Serverless Scheduler and Resource Automation. Below are quick summaries of each part of the solutions.

Part 1 – The serverless scheduler

This first part of the blog builds out a serverless workflow to get jobs from SQS and run them across EC2 instances. The CloudFormation template being used for Part 1 is serverless-scheduler-app.template, and here is the Reference Architecture:

 

Serverless Scheduler Reference Architecture . Reference Architecture for Part 1. This architecture shows just the Serverless Schduler. Part 2 builds out the resource allocation architecture. Outlined Steps with detail from figure one

    Figure 1: Serverless Scheduler Reference Architecture (grayed-out area is covered in Part 2).

Read the GitHub Repo if you want to look at the Step Functions workflow contained in preceding images. The walkthrough explains how the serverless application retrieves and runs jobs on its worker, updates DynamoDB job monitoring table, and manages the worker for its lifetime.

 

Part 2 – Resource automation with serverless scheduler


This part of the solution relies on the serverless scheduler built in Part 1 to run jobs on EC2.  Part 2 simplifies submitting and monitoring jobs, and retrieving results for users. Jobs are spread across our cost-optimized Spot Instances. AWS Autoscaling automatically scales up the compute resources when jobs are submitted, then terminates them when jobs are finished. Both of these save you money.

The CloudFormation template used in Part 2 is resource-automation.template. Building on Figure 1, the additional resources launched with Part 2 are noted in the following image, they are an S3 Bucket, AWS Autoscaling Group, and two Lambda functions.

Resource Automation using Serverless Scheduler This is Part 2 of the deployment process, and leverages the Part 1 architecture. This provides the resource allocation, that allows for automated job submission and EC2 Auto Scaling. Detailed steps for the prior image

 

Figure 2: Resource Automation using Serverless Scheduler

                               

Introduction to decoupled serverless scheduling

HPC schedulers traditionally run in a classic master and worker node configuration. A scheduler on the master node orchestrates jobs on worker nodes. This design has been successful for decades, however many powerful schedulers are evolving to meet the demands of HPC workloads. This scheduler design evolved from a necessity to run orchestration logic on one machine, but there are now options to decouple this logic.

What are the possible benefits that decoupling this logic could bring? First, we avoid a number of shortfalls in the environment such as the need for all worker nodes to communicate with a single master node. This single source of communication limits scalability and creates a single point of failure. When we split the scheduler into decoupled components both these issues disappear.

Second, in an effort to work around these pain points, traditional schedulers had to create extremely complex logic to manage all workers concurrently in a single application. This stifled the ability to customize and improve the code – restricting changes to be made by the software provider’s engineering teams.

Serverless services, such as AWS Step Functions and AWS Lambda fix these major issues. They allow you to decouple the scheduling logic to have a one-to-one mapping with each worker, and instead share an Amazon Simple Queue Service (SQS) job queue. We define our scheduling workflow in AWS Step Functions. Then the workflow scales out to potentially thousands of “state machines.” These state machines act as wrappers around each worker node and manage each worker node individually.  Our code is less complex because we only consider one worker and its job.

We illustrate the differences between a traditional shared scheduler and decoupled serverless scheduler in Figures 3 and 4.

 

Traditional Scheduler Model This shows a traditional sceduler where there is one central schduling host, and then multiple workers.

Figure 3: Traditional Scheduler Model

 

Decoupled Serverless Scheduler on each instance This shows what a Decoupled Serverless Scheduler design looks like, wit

Figure 4: Decoupled Serverless Scheduler on each instance

 

Each decoupled serverless scheduler will:

  • Retrieve and pass jobs to its worker
  • Monitor its workers health and take action if needed
  • Confirm job success by checking output logs and retry jobs if needed
  • Terminate the worker when job queue is empty just before also terminating itself

With this new scheduler model, there are many benefits. Decoupling schedulers into smaller schedulers increases fault tolerance because any issue only affects one worker. Additionally, each scheduler consists of independent AWS Lambda functions, which maintains the state on separate hardware and builds retry logic into the service.  Scalability also increases, because jobs are not dependent on a master node, which enables the geographic distribution of jobs. This geographic distribution allows you to optimize use of low-cost Spot Instances. Also, when decoupling the scheduler, workflow complexity decreases and you can customize scheduler logic. You can leverage lower latency job monitoring and customize automated responses to job events as they happen.

 

Benefits

  • Fully managed –  With Part 2, Resource Automation deployed, resources for a job are managed. When a job is submitted, resources launch and run the job. When the job is done, worker nodes automatically shut down. This prevents you from incurring continuous costs.

 

  • Performance – Your application runs on EC2, which means you can choose any of the high performance instance types. Input files are automatically copied from Amazon S3 into local Amazon EC2 Instance Store for high performance storage during execution. Result files are automatically moved to S3 after each job finishes.

 

  • Scalability – A worker node combined with a scheduler state machine become a stateless entity. You can spin up as many of these entities as you want, and point them to an SQS queue. You can even distribute worker and state machine pairs across multiple AWS regions. These two components paired with fully managed services optimize your architecture for scalability to meet your desired number of workers.

 

  • Fault Tolerance –The solution is completely decoupled, which means each worker has its own state machine that handles scheduling for that worker. Likewise, each state machine is decoupled into Lambda functions that make up your state machine. Additionally, the scheduler workflow includes a Lambda function that confirms each successful job or resubmits jobs.

 

  • Cost Efficiency – This fault tolerant environment is perfect for EC2 Spot Instances. This means you can save up to 90% on your workloads compared to On-Demand Instance pricing. The scheduler workflow ensures little to no idle time of workers by closely monitoring and sending new jobs as jobs finish. Because the scheduler is serverless, you only incur costs for the resources required to launch and run jobs. Once the job is complete, all are terminated automatically.

 

  • Agility – You can use AWS fully managed Developer Tools to quickly release changes and customize workflows. The reduced complexity of a decoupled scheduling workflow means that you don’t have to spend time managing a scheduling environment, and can instead focus on your applications.

 

 

Part 1 – serverless scheduler as a standalone solution

 

If you use the serverless scheduler as a standalone solution, you can build clusters and leverage shared storage such as FSx for Lustre, EFS, or S3. Additionally, you can use AWS CloudFormation or to deploy more complex compute architectures that suit your application. So, the EC2 Instances that run the serverless scheduler can be launched in any number of ways. The scheduler only requires the instance id and the SQS job queue name.

 

Submitting Jobs Directly to serverless scheduler

The severless scheduler app is a fully built AWS Step Function workflow to pull jobs from an SQS queue and run them on an EC2 Instance. The jobs submitted to SQS consist of an AWS Systems Manager Run Command, and work with any SSM Document and command that you chose for your jobs. Examples of SSM Run Commands are ShellScript and PowerShell.  Feel free to read more about Running Commands Using Systems Manager Run Command.

The following code shows the format of a job submitted to SQS in JSON.

  {

    "job_id": "jobId_0",

    "retry": "3",

    "job_success_string": " ",

    "ssm_document": "AWS-RunPowerShellScript",

    "commands":

        [

            "cd C:\\ProgramData\\Amazon\\SSM; mkdir Result",

            "Copy-S3object -Bucket my-bucket -KeyPrefix jobs/date/jobId_0 -LocalFolder .\\",

            "C:\\ProgramData\\Amazon\\SSM\\jobId_0.bat",

            "Write-S3object -Bucket my-bucket -KeyPrefix jobs/date/jobId_0 –Folder .\\Result\\"

        ],

  }

 

Any EC2 Instance associated with a serverless scheduler it receives jobs picked up from a designated SQS queue until the queue is empty. Then, the EC2 resource automatically terminates. If the job fails, it retries until it reaches the specified number of times in the job definition. You can include a specific string value so that the scheduler searches for job execution outputs and confirms the successful completions of jobs.

 

Tagging EC2 workers to get a serverless scheduler state machine

In Part 1 of the deployment, you must manage your EC2 Instance launch and termination. When launching an EC2 Instance, tag it with a specific tag key that triggers a state machine to manage that instance. The tag value is the name of the SQS queue that you want your state machine to poll jobs from.

In the following example, “my-scheduler-cloudformation-stack-name” is the tag key that serverless scheduler app will for with any new EC2 instance that starts. Next, “my-sqs-job-queue-name” is the default job queue created with the scheduler. But, you can change this to any queue name you want to retrieve jobs from when an instance is launched.

{"my-scheduler-cloudformation-stack-name":"my-sqs-job-queue-name"}

 

Monitor jobs in DynamoDB

You can monitor job status in the following DynamoDB. In the table you can find job_id, commands sent to Amazon EC2, job status, job output logs from Amazon EC2, and retries among other things.

Alternatively, you can query DynamoDB for a given job_id via the AWS Command Line Interface:

aws dynamodb get-item --table-name job-monitoring \

                      --key '{"job_id": {"S": "/my-jobs/my-job-id.bat"}}'

 

Using the “job_success_string” parameter

For the prior DynamoDB table, we submitted two identical jobs using an example script that you can also use. The command sent to the instance is “echo Hello World.” The output from this job should be “Hello World.” We also specified three allowed job retries.  In the following image, there are two jobs in SQS queue before they ran.  Look closely at the different “job_success_strings” for each and the identical command sent to both:

DynamoDB CLI info This shows an example DynamoDB CLI output with job information.

From the image we see that Job2 was successful and Job1 retried three times before permanently labelled as failed. We forced this outcome to demonstrate how the job success string works by submitting Job1 with “job_success_string” as “Hello EVERYONE”, as that will not be in the job output “Hello World.” In “Job2” we set “job_success_string” as “Hello” because we knew this string will be in the output log.

Job outputs commonly have text that only appears if job succeeded. You can also add this text yourself in your executable file. With “job_success_string,” you can confirm a job’s successful output, and use it to identify a certain value that you are looking for across jobs.

 

Part 2 – Resource Automation with the serverless scheduler

The additional services we deploy in Part 2 integrate with existing architectures to launch resources for your serverless scheduler. These services allow you to submit jobs simply by uploading input files and executable files to an S3 bucket.

Likewise, these additional resources can use any executable file format you want, including proprietary application level scripts. The solution automates everything else. This includes creating and submitting jobs to SQS job queue, spinning up compute resources when new jobs come in, and taking them back down when there are no jobs to run. When jobs are done, result files are copied to S3 for the user to retrieve. Similar to Part 1, you can still view the DynamoDB table for job status.

This architecture makes it easy to scale out to different teams and departments, and you can submit potentially hundreds of thousands of jobs while you remain in control of resources and cost.

 

Deeper Look at the S3 Architecture

The following diagram shows how you can submit jobs, monitor progress, and retrieve results. To submit jobs, upload all the needed input files and an executable script to S3. The suffix of the executable file (uploaded last) triggers an S3 event to start the process, and this suffix is configurable.

The S3 key of the executable file acts as the job id, and is kept as a reference to that job in DynamoDB. The Lambda (#2 in diagram below) uses the S3 key of the executable to create three SSM Run Commands.

  1. Synchronize all files in the same S3 folder to a working directory on the EC2 Instance.
  2. Run the executable file on EC2 Instances within a specified working directory.
  3. Synchronize the EC2 Instances working directory back to the S3 bucket where newly generated result files are included.

This Lambda (#2) then places the job on the SQS queue using the schedulers JSON formatted job definition seen above.

IMPORTANT: Each set of job files should be given a unique job folder in S3 or more files than needed might be moved to the EC2 Instance.

 

Figure 5: Resource Automation using Serverless Scheduler - A deeper look A deeper dive in to Part 2, resource allcoation.

Figure 5: Resource Automation using Serverless Scheduler – A deeper look

 

EC2 and Step Functions workflow use the Lambda function (#3 in prior diagram) and the Auto Scaling group to scale out based on the number of jobs in the queue to a maximum number of workers (plus state machine), as defined in the Auto Scaling Group. When the job queue is empty, the number of running instances scale down to 0 as they finish their remaining jobs.

 

Process Submitting Jobs and Retrieving Results

  1. Seen in1, upload input file(s) and an executable file into a unique job folder in S3 (such as /year/month/day/jobid/~job-files). Upload the executable file last because it automatically starts the job. You can also use a script to upload multiple files at a time but each job will need a unique directory. There are many ways to make S3 buckets available to users including AWS Storage Gateway, AWS Transfer for SFTP, AWS DataSync, the AWS Console or any one of the AWS SDKs leveraging S3 API calls.
  2. You can monitor job status by accessing the DynamoDB table directly via the AWS Management Console or use the AWS CLI to call DynamoDB via an API call.
  3. Seen in step 5, you can retrieve result files for jobs from the same S3 directory where you left the input files. The DynamoDB table confirms when jobs are done. The SQS output queue can be used by applications that must automatically poll and retrieve results.

You no longer need to create or access compute nodes as compute resources. These automatically scale up from zero when jobs come in, and then back down to zero when jobs are finished.

 

Deployment

Read the GitHub Repo for deployment instructions. Below are CloudFormation templates to help:

AWS RegionLaunch Stack
eu-north-1link to zone
ap-south-1
eu-west-3
eu-west-2
eu-west-1
ap-northeast-3
ap-northeast-2
ap-northeast-1
sa-east-1
ca-central-1
ap-southeast-1
ap-southeast-2
eu-central-1
us-east-1
us-east-2
us-west-1
us-west-2

 

 

Additional Points on Usage Patterns

 

  • While the two solutions in this blog are aimed at HPC applications, they can be used to run any batch jobs. Many customers that run large data processing batch jobs in their data lakes could use the serverless scheduler.

 

  • You can build pipelines of different applications when the output of one job triggers another to do something else – an example being pre-processing, meshing, simulation, post-processing. You simply deploy the Resource Automation template several times, and tailor it so that the output bucket for one step is the input bucket for the next step.

 

  • You might look to use the “job_success_string” parameter for iteration/verification used in cases where a shot-gun approach is needed to run thousands of jobs, and only one has a chance of producing the right result. In this case the “job_success_string” would identify the successful job from potentially hundreds of thousands pushed to SQS job queue.

 

Scale-out across teams and departments

Because all services used are serverless, you can deploy as many run environments as needed without increasing overall costs. Serverless workloads only accumulate cost when the services are used. So, you could deploy ten job environments and run one job in each, and your costs would be the same if you had one job environment running ten jobs.

 

All you need is an S3 bucket to upload jobs to and an associated AMI that has the right applications and license configuration. Because a job configuration is passed to the scheduler at each job start, you can add new teams by creating an S3 bucket and pointing S3 events to a default Lambda function that pulls configurations for each job start.

 

Setup CI/CD pipeline to start continuous improvement of scheduler

If you are advanced, we encourage you to clone the git repo and customize this solution. The serverless scheduler is less complex than other schedulers, because you only think about one worker and the process of one job’s run.

Ways you could tailor this solution:

  • Add intelligent job scheduling using AWS Sagemaker  – It is hard to find data as ready for ML as log data because every job you run has different run times and resource consumption. So, you could tailor this solution to predict the best instance to use with ML when workloads are submitted.
  • Add Custom Licensing Checkout Logic – Simply add one Lambda function to your Step Functions workflow to make an API call a license server before continuing with one or more jobs. You can start a new worker when you have a license checked out or if a license is not available then the instance can terminate to remove any costs waiting for licenses.
  • Add Custom Metrics to DynamoDB – You can easily add metrics to DynamoDB because the solution already has baseline logging and monitoring capabilities.
  • Run on other AWS Services – There is a Lambda function in the Step Functions workflow called “Start_Job”. You can tailor this Lambda to run your jobs on AWS Sagemaker, AWS EMR, AWS EKS or AWS ECS instead of EC2.

 

Conclusion

 

Although HPC workloads and EDA flows may still be dependent on current scheduling technologies, we illustrated the possibilities of decoupling your workloads from your existing shared scheduling environments. This post went deep into decoupled serverless scheduling, and we understand that it is difficult to unwind decades of dependencies. However, leveraging numerous AWS Services encourages you to think completely differently about running workloads.

But more importantly, it encourages you to Think Big. With this solution you can get up and running quickly, fail fast, and iterate. You can do this while scaling to your required number of resources, when you want them, and only pay for what you use.

Serverless computing  catalyzes change across all industries, but that change is not obvious in the HPC and EDA industries. This solution is an opportunity for customers to take advantage of the nearly limitless capacity that AWS.

Please reach out with questions about HPC and EDA on AWS. You now have the architecture and the instructions to build your Serverless Decoupled Scheduling environment.  Go build!


About the Authors and Contributors

Authors 

 

Ludvig Nordstrom is a Senior Solutions Architect at AWS

 

 

 

 

Mark Duffield is a Tech Lead in Semiconductors at AWS

 

 

 

Contributors

 

Steve Engledow is a Senior Solutions Builder at AWS

 

 

 

 

Arun Thomas is a Senior Solutions Builder at AWS

 

 

Running Cost-effective queue workers with Amazon SQS and Amazon EC2 Spot Instances

Post Syndicated from peven original https://aws.amazon.com/blogs/compute/running-cost-effective-queue-workers-with-amazon-sqs-and-amazon-ec2-spot-instances/

This post is contributed by Ran Sheinberg | Sr. Solutions Architect, EC2 Spot & Chad Schmutzer | Principal Developer Advocate, EC2 Spot | Twitter: @schmutze

Introduction

Amazon Simple Queue Service (SQS) is used by customers to run decoupled workloads in the AWS Cloud as a best practice, in order to increase their applications’ resilience. You can use a worker tier to do background processing of images, audio, documents and so on, as well as offload long-running processes from the web tier. This blog post covers the benefits of pairing Amazon SQS and Spot Instances to maximize cost savings in the worker tier, and a customer success story.

Solution Overview

Amazon SQS is a fully managed message queuing service that enables customers to decouple and scale microservices, distributed systems, and serverless applications. It is a common best practice to use Amazon SQS with decoupled applications. Amazon SQS increases applications resilience by decoupling the direct communication between the frontend application and the worker tier that does data processing. If a worker node fails, the jobs that were running on that node return to the Amazon SQS queue for a different node to pick up.

Both the frontend and worker tier can run on Spot Instances, which offer spare compute capacity at steep discounts compared to On-Demand Instances. Spot Instances optimize your costs on the AWS Cloud and scale your application’s throughput up to 10 times for the same budget. Spot Instances can be interrupted with two minutes of notification when EC2 needs the capacity back. You can use Spot Instances for various fault-tolerant and flexible applications. These can include analytics, containerized workloads, high performance computing (HPC), stateless web servers, rendering, CI/CD, and queue worker nodes—which is the focus of this post.

Worker tiers of a decoupled application are typically fault-tolerant. So, it is a prime candidate for running on interruptible capacity. Amazon SQS running on Spot Instances allows for more robust, cost-optimized applications.

By using EC2 Auto Scaling groups with multiple instance types that you configured as suitable for your application (for example, m4.xlarge, m5.xlarge, c5.xlarge, and c4.xlarge, in multiple Availability Zones), you can spread the worker tier’s compute capacity across many Spot capacity pools (a combination of instance type and Availability Zone). This increases the chance of achieving the scale that’s required for the worker tier to ingest messages from the queue, and of keeping that scale when Spot Instance interruptions occur, while selecting the lowest-priced Spot Instances in each availability zone.

You can also choose the capacity-optimized allocation strategy for the Spot Instances in your Auto Scaling group. This strategy automatically selects instances that have a lower chance of interruption, which decreases the chances of restarting jobs due to Spot interruptions. When Spot Instances are interrupted, your Auto Scaling group automatically replenishes the capacity from a different Spot capacity pool in order to achieve your desired capacity. Read the blog post “Introducing the capacity-optimized allocation strategy for Amazon EC2 Spot Instances” for more details on how to choose the suitable allocation strategy.

We focus on three main points in this blog:

  1. Best practices for using Spot Instances with Amazon SQS
  2. A customer example that uses these components
  3. Example solution that can help you get you started quickly

Application of Amazon SQS with Spot Instances

Amazon SQS eliminates the complexity of managing and operating message-oriented middleware. Using Amazon SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. Amazon SQS is a fully managed service which allows you to set up a queue in seconds. It also allows you to use your preferred SDK to start writing and reading to and from the queue within minutes.

In the following example, we describe an AWS architecture that brings together the Amazon SQS queue and an EC2 Auto Scaling group running Spot Instances. The architecture is used for decoupling the worker tier from the web tier by using Amazon SQS. The example uses the Protect feature (which we will explain later in this post) to ensure that an instance currently processing a job does not get terminated by the Auto Scaling group when it detects that a scale-in activity is required due to a Dynamic Scaling Policy.Architecture diagram for using Amazon SQS with Spot Instances and Auto Scaling groups

AWS reference architecture used for decoupling the worker tier from the web tier by using Amazon SQS

Customer Example: How Trax Retail uses Auto Scaling groups with Spot Instances in their Amazon SQS application

Trax decided to run its queue worker tier exclusively on Spot Instances due to the fault-tolerant nature of its architecture and for cost-optimization purposes. The company digitizes the physical world of retail using Computer Vision. Their ‘Trax Factory’ transforms individual shelf into data and insights about retail store conditions.

Built using asynchronous event-driven architecture, Trax Factory is a cluster of microservices in which the completion of one service triggers the activation of another service. The worker tier uses Auto Scaling groups with dynamic scaling policies to increase and decrease the number of worker nodes in the worker tier.

You can create a Dynamic Scaling Policy by doing the following:

  1. Observe a Amazon CloudWatch metric. Watch the metric for the current number of messages in the Amazon SQS queue (ApproximateNumberOfMessagesVisible).
  2. Create a CloudWatch alarm. This alarm should be based on that metric you created in the prior step.
  3. Use your CloudWatch alarm in a Dynamic Scaling Policy. Use this policy increase and decrease the number of EC2 Instances in the Auto Scaling group.

In Trax’s case, due to the high variability of the number of messages in the queue, they opted to enhance this approach in order to minimize the time it takes to scale, by building a service that would call the SQS API and find the current number of messages in the queue more frequently, instead of waiting for the 5 minute metric refresh interval in CloudWatch.

Trax ensures that its applications are always scaled to meet the demand by leveraging the inherent elasticity of Amazon EC2 instances. This elasticity ensures that end users are never affected and/or service-level agreements (SLA) are never violated.

With a Dynamic Scaling Policy, the Auto Scaling group can detect when the number of messages in the queue has decreased, so that it can initiate a scale-in activity. The Auto Scaling group uses its configured termination policy for selecting the instances to be terminated. However, this policy poses the risk that the Auto Scaling group might select an instance for termination while that instance is currently processing an image. That instance’s work would be lost (although the image would eventually be processed by reappearing in the queue and getting picked up by another worker node).

To decrease this risk, you can use Auto Scaling groups instance protection. This means that every time an instance fetches a job from the queue, it also sends an API call to EC2 to protect itself from scale-in. The Auto Scaling group does not select the protected, working instance for termination until the instance finishes processing the job and calls the API to remove the protection.

Handling Spot Instance interruptions

This instance-protection solution ensures that no work is lost during scale-in activities. However, protecting from scale-in does not work when an instance is marked for termination due to Spot Instance interruptions. These interruptions occur when there’s increased demand for On-Demand Instances in the same capacity pool (a combination of an instance type in an Availability Zone).

Applications can minimize the impact of a Spot Instance interruption. To do so, an application catches the two-minute interruption notification (available in the instance’s metadata), and instructs itself to stop fetching jobs from the queue. If there’s an image still being processed when the two minutes expire and the instance is terminated, the application does not delete the message from the queue after finishing the process. Instead, the message simply becomes visible again for another instance to pick up and process after the Amazon SQS visibility timeout expires.

Alternatively, you can release any ongoing job back to the queue upon receiving a Spot Instance interruption notification by setting the visibility timeout of the specific message to 0. This timeout potentially decreases the total time it takes to process the message.

Testing the solution

If you’re not currently using Spot Instances in your queue worker tier, we suggest testing the approach described in this post.

For that purpose, we built a simple solution to demonstrate the capabilities mentioned in this post, using an AWS CloudFormation template. The stack includes an Amazon Simple Storage Service (S3) bucket with a CloudWatch trigger to push notifications to an SQS queue after an image is uploaded to the Amazon S3 bucket. Once the message is in the queue, it is picked up by the application running on the EC2 instances in the Auto Scaling group. Then, the image is converted to PDF, and the instance is protected from scale-in for as long as it has an active processing job.

To see the solution in action, deploy the CloudFormation template. Then upload an image to the Amazon S3 bucket. In the Auto Scaling Groups console, check the instance protection status on the Instances tab. The protection status is shown in the following screenshot.

instance protection status in console

You can also see the application logs using CloudWatch Logs:

/usr/local/bin/convert-worker.sh: Found 1 messages in https://sqs.us-east-1.amazonaws.com/123456789012/qtest-sqsQueue-1CL0NYLMX64OB

/usr/local/bin/convert-worker.sh: Found work to convert. Details: INPUT=Capture1.PNG, FNAME=capture1, FEXT=png

/usr/local/bin/convert-worker.sh: Running: aws autoscaling set-instance-protection --instance-ids i-0a184c5ae289b2990 --auto-scaling-group-name qtest-autoScalingGroup-QTGZX5N70POL --protected-from-scale-in

/usr/local/bin/convert-worker.sh: Convert done. Copying to S3 and cleaning up

/usr/local/bin/convert-worker.sh: Running: aws s3 cp /tmp/capture1.pdf s3://qtest-s3bucket-18fdpm2j17wxx

/usr/local/bin/convert-worker.sh: Running: aws sqs --output=json delete-message --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/qtest-sqsQueue-1CL0NYLMX64OB --receipt-handle

/usr/local/bin/convert-worker.sh: Running: aws autoscaling set-instance-protection --instance-ids i-0a184c5ae289b2990 --auto-scaling-group-name qtest-autoScalingGroup-QTGZX5N70POL --no-protected-from-scale-in

Conclusion

This post helps you architect fault tolerant worker tiers in a cost optimized way. If your queue worker tiers are fault tolerant and use the built-in Amazon SQS features, you can increase your application’s resilience and take advantage of Spot Instances to save up to 90% on compute costs.

In this post, we emphasized several best practices to help get you started saving money using Amazon SQS and Spot Instances. The main best practices are:

  • Diversifying your Spot Instances using Auto Scaling groups, and selecting the right Spot allocation strategy
  • Protecting instances from scale-in activities while they process jobs
  • Using the Spot interruption notification so that the application stop polling the queue for new jobs before the instance is terminated

We hope you found this post useful. If you’re not using Spot Instances in your queue worker tier, we suggest testing the approach described here. Finally, we would like to thank the Trax team for sharing its architecture and best practices. If you want to learn more, watch the “This is my architecture” video featuring Trax and their solution.

We’d love your feedback—please comment and let me know what you think.


About the authors

 

Ran Sheinberg is a specialist solutions architect for EC2 Spot Instances with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

As a Principal Developer Advocate for EC2 Spot at AWS, Chad’s job is to make sure our customers are saving at scale by using EC2 Spot Instances to take advantage of the most cost-effective way to purchase compute capacity. Follow him on Twitter here! @schmutze

 

Introducing AWS Lambda Destinations

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/

Today we’re announcing AWS Lambda Destinations for asynchronous invocations. This is a feature that provides visibility into Lambda function invocations and routes the execution results to AWS services, simplifying event-driven applications and reducing code complexity.

Asynchronous invocations

When a function is invoked asynchronously, Lambda sends the event to an internal queue. A separate process reads events from the queue and executes your Lambda function. When the event is added to the queue, Lambda previously only returned a 2xx status code to confirm that the queue has received this event. There was no additional information to confirm whether the event had been processed successfully.

A common event-driven microservices architectural pattern is to use a queue or message bus for communication. This helps with resilience and scalability. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), or Amazon EventBridge for further processing. Previously, you needed to write the SQS/SNS/EventBridge handling code within your Lambda function and manage retries and failures yourself.

With Destinations, you can route asynchronous function results as an execution record to a destination resource without writing additional code. An execution record contains details about the request and response in JSON format including version, timestamp, request context, request payload, response context, and response payload. For each execution status such as Success or Failure you can choose one of four destinations: another Lambda function, SNS, SQS, or EventBridge. Lambda can also be configured to route different execution results to different destinations.

Asynchronous Function Execution Result

Success

When a function is invoked successfully, Lambda routes the record to the destination resource for every successful invocation. You can use this to monitor the health of your serverless applications via execution status or build workflows based on the invocation result.

You no longer need to chain long-running Lambda functions together synchronously. Previously you needed to complete the entire workflow within the Lambda 15-minute function timeout, pay for idle time, and wait for a response. Destinations allows you to return a Success response to the calling function and then handle the remaining chaining functions asynchronously.

Failure

Alongside today’s announcement of Maximum Event Age and Maximum Retry Attempt for asynchronous invocations, Destinations gives you the ability to handle the Failure of function invocations along with their Success. When a function invocation fails, such as when retries are exhausted or the event age has been exceeded (hitting its TTL), Destinations routes the record to the destination resource for every failed invocation for further investigation or processing.

Dead Letter Queues (DLQ) have been available since 2016 and are a great way to handle asynchronous failure situations. Destinations provide more useful capabilities by passing additional function execution information, including code exception stack traces, to more destination services.

Destinations and DLQs can be used together and at the same time although Destinations should be considered a more preferred solution. If you already have DLQs set up, existing functionality does not change and Destinations does not replace existing DLQ configurations. If both Destinations and DLQ are used for Failure notifications, function invoke errors are sent to both DLQ and Destinations targets.

How to configure Destinations

Adding Destinations is a straightforward process. This walkthrough uses the AWS Management Console but you can also use the AWS CLI, AWS SAM, AWS CloudFormation, or language-specific SDKs for Lambda.

  1. Open the Lambda console Functions page. Choose an existing Lambda function, or create a new one. In this example, I create a new Lambda function. Choose Create Function.
  2. Enter a Function name, select Node.js 12.x for Runtime, and Choose or create an execution role. Ensure that your Lambda function execution role includes access to the destination resource.
    Basic information
  3. Choose Create function.
  4. Within the Function code pane, paste the following Lambda function code. The code generates a function execution result of either Success or Failure depending on a JSON input ("Success": true or "Success": false).
    // Lambda Destinations tester, Success returns a json blob, Failure throws an error
    
    exports.handler = function(event, context, callback) {
        var event_received_at = new Date().toISOString();
        console.log('Event received at: ' + event_received_at);
        console.log('Received event:', JSON.stringify(event, null, 2));
    
        if (event.Success) {
            console.log("Success");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(null);
        } else {
            console.log("Failure");
            context.callbackWaitsForEmptyEventLoop = false;
            callback(new Error("Failure from event, Success = false, I am failing!"), 'Destination Function Error Thrown');
        }
    };
    
  5. Choose Save.
  6. To configure Destinations, within the Designer pane, choose Add destination.
    Designer pane
  7. Select the Source as Asynchronous invocation. Select the Condition as On failure or On success, depending on your use case. In this example, I select On Success.
  8. Enter the Amazon Resource Name (ARN) for the Destination SQS queue, SNS topic, Lambda function, or EventBridge event bus. In this example, I use the ARN of an SNS topic I have already configured.
    Add destination
  9. Choose Save. The Destination is added to SNS for On Success.
    Designer
  10. Add another Destination for Failure to Lambda. Within the Designer pane, choose Add destination.
    Add destination
  11. Select the Source as Asynchronous invocation, the Condition as On failure and Enter a Destination Lambda function ARN, then choose Save.
    Enter a Destination Lambda function ARN, and choose Save
  12. The Destination is added to Lambda for On Failure.
    7. The Destination has been added to Lambda for On Failure.

Success testing

To test invoking the asynchronous Lambda function to generate a Success result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": true }' response.json

The Lambda function is invoked successfully with a response "StatusCode": 202.

And an SNS notification email is received, showing the invocation details with "condition":"Success" and the requestPayload.

{
	"version": "1.0",
	"timestamp": "2019-11-24T23:08:25.651Z",
	"requestContext": {
		"requestId": "c2a6f2ae-7dbb-4d22-8782-d0485c9877e2",
		"functionArn": "arn:aws:lambda:sa-east-1:123456789123:function:event-destinations:$LATEST",
		"condition": "Success",
		"approximateInvokeCount": 1
	},
	"requestPayload": {
		"Success": true
	},
	"responseContext": {
		"statusCode": 200,
		"executedVersion": "$LATEST"
	},
	"responsePayload": null
}

Failure testing

The Lambda function can be set to Failure by throwing an exception within the code. To test invoking the asynchronous Lambda function to generate a Failure result, use the AWS CLI:

aws lambda invoke --function-name event-destinations --invocation-type Event --payload '{ "Success": false }' response.json

The Lambda function is executed and reports a successful invoke on the Lambda processing queue. If Lambda is not able to add the event to the queue, the error message appears in the command output.

However, due to the exception error within the code, the function invocation will fail. Destinations then routes the invoke failure to the configured destination Lambda function. You can see the failed function invocation information in the Amazon CloudWatch Logs for the Destination function including "condition": "RetriesExhausted", along with the requestPayload, errorMessage, and stackTrace.

2019-11-24T21:52:47.855Z	d123456-c0dd-4871-a123-a356cb1b3ba6	EVENT
{
    "version": "1.0",
    "timestamp": "2019-11-24T21:52:47.333Z",
    "requestContext": {
        "requestId": "8ea123e4-1db7-4aca-ad10-d9ca1234c1fd",
        "functionArn": "arn:aws:lambda:sa-east-1:123456678912:function:event-destinations:$LATEST",
        "condition": "RetriesExhausted",
        "approximateInvokeCount": 3
    },
    "requestPayload": {
        "Success": false
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Handled"
    },
    "responsePayload": {
        "errorMessage": "Failure from event, Success = false, I am failing!",
        "errorType": "Error",
        "stackTrace": [ "exports.handler (/var/task/index.js:18:18)" ]
    }
}

Destination-specific JSON format

  • For SNS/SQS, the JSON object is passed as the Message to the destination.
  • For Lambda, the JSON is passed as the payload to the function. The destination function cannot be the same as the source function. For example, if LambdaA has a Destination configuration attached for Success, LambdaA is not a valid destination ARN. This prevents recursive functions.
  • For EventBridge, the JSON is passed as the Detail in the PutEvents call. The source is lambda, and detail type is either Lambda Function Invocation Result - Success or Lambda Function Invocation Result – Failure. The resource fields contain the function and destination ARNs.

AWS CloudFormation configuration

Destinations CloudFormation configuration is created via the following YAML.

Resources: 
  EventInvokeConfig:
    Type: AWS::Lambda::EventInvokeConfig
    Properties:
        FunctionName: “YourLambdaFunctionWithEventInvokeConfig”
        Qualifier: "$LATEST"
        MaximumEventAgeInSeconds: 600
        MaximumRetryAttempts: 0
        DestinationConfig:
            OnSuccess:
                Destination: “arn:aws:sns:us-east-1:123456789012:YourSNSTopicOnSuccess”
            OnFailure:
                Destination: “arn:aws:lambda:us-east-1:123456789012:function:YourLambdaFunctionOnFailure”

Conclusion

AWS Lambda Destinations gives you more visibility and control of function execution results. This helps you build better event-driven applications, reducing code, and using Lambda’s native failure handling controls.

There are no additional costs for enabling Lambda Destinations. However, calls made to destination target services may be charged.

To learn more, see Lambda Destinations in the AWS Lambda Developer Guide.

New for AWS Lambda – SQS FIFO as an event source

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/new-for-aws-lambda-sqs-fifo-as-an-event-source/

AWS Lambda first announced support for Amazon SQS standard queues as an event source in April 2018. This allows builders to develop serverless applications using queues to directly invoke Lambda functions. Today, we have expanded this feature to include SQS FIFO queues. This makes it easier to create serverless applications using queues where the order of messages is important.

Ordered events and operations are critical for some types of event-driven application. For example, in stock market trading applications, the order of events for a trade is essential to determine the time, price, and parties in a transaction. Similarly, in applications managing pricing or inventory, the order of events is key for maintaining the data integrity of the system.

Using SQS FIFO with Lambda is straight-forward, with only minor differences in configuration compared with standard SQS queues. This blog post explains how serverless developers can get started using SQS FIFO queues in their Lambda functions.

How it works

SQS FIFO queues are similar to standard queues but use two additional attributes in the SendMessage API. These attributes are also returned in the event payload to the consuming Lambda function:

  • MessageGroupID: this required field enables multiple message groups within a single queue. If you do not need this functionality, provide the same MessageGroupId value for all messages. Messages within a single group are processed in a FIFO fashion.
  • MessageDeduplicationID: this token is used to deduplicate messages within a 5-minute interval. If two messages with the same MessageDuplicationID are sent, both messages are accepted successfully but only one is delivered. Learn more about the best practices for using this attribute.

You can submit a message into an SQS FIFO queue using the AWS Management Console, AWS CLI, AWS SAM, or AWS SDK. This SDK example uses Node.js to submit a single message:

const AWS = require('aws-sdk')
AWS.config.update({region: 'us-west-2'})
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})

const send = async (groupId, messageId) => {
  return await sqs.sendMessage({
    MessageGroupId: `group-${groupId}`,
    MessageDeduplicationId: `m-${groupId}-${messageId}`,
    MessageBody: `${messageId}`,
    QueueUrl: "https://sqs.us-west-2.amazonaws.com/123456789012/myTestQueue.fifo"
  }).promise()
}

const main = async () => {
  await send ('A', '1')
}
main()

When the Lambda function is triggered by the SQS FIFO queue, these two attributes are delivered as part of the event payload:

SQS FIFO payload

When configured this way, the SQS FIFO queue sends as many messages as possible to the consuming Lambda function, subject to the Batch size configured in the trigger. The Batch size determines the number of items to read from the queue, up to a maximum of 10 items, though a single batch is smaller if the queue has fewer items.

For example, if you submit six messages to a queue with a Batch size of 10 using two separate MessageGroupId values, the consuming Lambda function would receive all six items:

 

SQS FIFO example #1

As all the messages are processed in a single batch, only one Lambda function is invoked. A more complex example shows how the Lambda service uses concurrency to process a busy SQS FIFO queue more efficiently.

SQS FIFO example #2

In this example, there are three message groups, and a Lambda function with an SQS FIFO trigger. The Batch size for the Lambda trigger is 5, and there are 53 messages in total:

  • Message Group A consists of 18 messages, A1-A18
  • Message Group B consists of 12 messages, B1-B12
  • Message Group C consists of 23 messages, C1-C23

There are three rules governing the order of messages leaving a FIFO queue, to help understand the processing behavior:

  1. Return the oldest message where no other message with the same MessageGroupId is in flight.
  2. Return as many messages with the same MessageGroupId as possible.
  3. If a message batch is still not full, go back to the first rule. As a result, it’s possible for a single batch to contain messages from multiple MessageGroupIds.

In this case, the SQS FIFO queue receives these messages, and orders the items within each message group. In processing, the following events occur:

  1. Consumer #1 receives 5 messages in batch #1 (C1-C5).
  2. Consumer #1 receives 5 messages in batch #2 (B1-B5).
  3. As there are messages in flight for group B, and there are items in the queue from other groups ready for processing, Lambda scales up. Consumer #2 now receives 5 messages in batch #2 (C6-C10).
  4. Consumer #3 receives batch #3 (A1-A5).
  5. Each Lambda instance will continue receiving batches of 5 messages until the queue is empty.

Lambda automatically scales out horizontally to consume the messages in the queue. It will try to consume the queue as quickly and efficiently as possible by maximizing concurrency within the bounds of each service. As queue traffic fluctuates, the Lambda service scales the polling operations based on the number of inflight messages.

In SQS FIFO queues, using more than one MessageGroupId enables Lambda to scale up and process more items in the queue using a greater concurrency limit. Total concurrency is equal to or less than the number of unique MessageGroupIds in the SQS FIFO queue. Learn more about AWS Lambda Function Scaling and how it applies to your event source.

Amazon SQS FIFO queues ensure that the order of processing follows the message order within a message group. However, it does not guarantee only once delivery when used as a Lambda trigger. If only once delivery is important in your serverless application, it’s recommended to make your function idempotent. You could achieve this by tracking a unique attribute of the message using a scalable, low-latency control database like Amazon DynamoDB.

Triggering Lambda from SQS

1. Navigate to the SQS console and choose Create New Queue.

2. Names for FIFO queues must end in .fifo. For Queue Name, enter myTestQueue.fifo and select FIFO Queue. Choose Quick-Create Queue.

Create New Queue

3. From the Lambda console, choose Create function.

4. Enter mySQStest for Function name, and leave the Node.js 12.x runtime selected. Open the Choose or create an execution role panel and select Create a new role from AWS policy templates.

5. Enter mySQStest for Role name and select Amazon SQS poller permissions from the Policy templates drop-down. Choose Create function.

Create Lambda function

6. Once the function is created, you can configure the trigger. Choose Add trigger.

Add trigger

7. In the Select a trigger drop-down, choose SQS. In the SQS queue drop-down, select the FIFO queue created earlier. Leave Batch size as the default value, then choose Add.

Add trigger dialog

8. After a few seconds, the SQS trigger is ready.

SQS FIFO trigger ready

The Lambda function is now invoked when new messages are available in the SQS FIFO queue.

Conclusion

Now, you can process messages from Amazon SQS FIFO queues with Lambda, helping bring scalable serverless processing to applications needing first-in, first-out message processing.

You can get started with SQS FIFO queue as a Lambda event source via AWS Management Console, AWS CLI, AWS SAM, AWS CloudFormation, or AWS SDK for Lambda. It is available in all AWS Regions where AWS Lambda is available. You pay only for the SQS API operations performed by the Lambda service on your behalf, and the Lambda requests and duration used to process your messages.

For more information on where AWS Lambda is available, see the AWS Region Table. To learn more, see Using AWS Lambda with Amazon SQS FIFO in the AWS Lambda Developer Guide.

 

 

Designing durable serverless apps with DLQs for Amazon SNS, Amazon SQS, AWS Lambda

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/designing-durable-serverless-apps-with-dlqs-for-amazon-sns-amazon-sqs-aws-lambda/

This post is courtesy of Otavio Ferreira, Sr Manager, SNS.

In a postal system, a dead-letter office is a facility for processing undeliverable mail. In pub/sub messaging, a dead-letter queue (DLQ) is a queue to which messages published to a topic can be sent, in case those messages cannot be delivered to a subscribed endpoint.

Amazon SNS supports DLQs, making your applications more resilient and durable upon delivery failure modes.

Understanding message delivery failures and retries

The delivery of a message fails when it’s not possible for Amazon SNS to access the subscribed endpoint. There are two reasons why this might happen:

  • Client errors, where the client is SNS (the message sender).
  • Server errors, where the server is the system that hosts the subscription endpoint (the message receiver), such as Amazon SQS or AWS Lambda.

Client errors

Client errors happen when SNS has stale subscription metadata. One common cause of client errors is when you (the endpoint owner) delete the endpoint. For example, you might delete the SQS queue that is subscribed to your SNS topic, without also deleting the SNS subscription corresponding to the queue. Another common cause is when you change the resource policy attached to your endpoint in a way that prevents SNS from delivering messages to that endpoint.

These errors are considered client errors because the client has attempted the delivery of a message to a destination that, from the client’s perspective, is no longer accessible. SNS does not retry the delivery of messages that failed as the result of client errors.

Server errors

Server errors happen when the system that powers the subscribed endpoint is unavailable, or when it returns an exception response indicating that it failed to process a valid request from SNS.

When server errors occur, SNS retries the failed deliveries according to a backoff function, which can be either linear or exponential. When a server error occurs for an AWS managed endpoint, backed by either SQS or Lambda, then SNS retries the delivery for up to 100,015 times, over 23 days.

Server errors can also happen with customer managed endpoints, namely HTTP, SMS, email, and mobile push endpoints. SNS also retries the delivery for these types of endpoints. HTTP endpoints support customer-defined retry policies, while SNS sets an internal delivery retry policy for SMS, email, and mobile push endpoints to 50 times, over 6 hours.

Delivery retries

SNS may receive a client error, or continue to receive a server error for a message beyond the number of retries defined by the corresponding retry policy. In that event, SNS discards the message. Setting a DLQ to your SNS subscription enables you to keep this message, regardless of the type of error, either client or server. DLQs give you more control over messages that cannot be delivered.

For more information on the delivery retry policy for each delivery protocol supported by SNS, see Amazon SNS Message Delivery Retry.

Using DLQs for AWS services

SNS, SQS, and Lambda support DLQs, addressing different failure modes. All DLQs are regular queues powered by SQS.

In SNS, DLQs store the messages that failed to be delivered to subscribed endpoints. For more information, see Amazon SNS Dead-Letter Queues.

In SQS, DLQs store the messages that failed to be processed by your consumer application. This failure mode can happen when producers and consumers fail to interpret aspects of the protocol that they use to communicate. In that case, the consumer receives the message from the queue, but fails to process it, as the message doesn’t have the structure or content that the consumer expects. The consumer can’t delete the message from the queue either. After exhausting the receive count in the redrive policy, SQS can sideline the message to the DLQ. For more information, see Amazon SQS Dead-Letter Queues.

In Lambda, DLQs store the messages that resulted in failed asynchronous executions of your Lambda function. An execution can result in an error for several reasons. Your code might raise an exception, time out, or run out of memory. The runtime executing your code might encounter an error and stop. Your function might hit its concurrency limit and be throttled. Regardless of the error type, when the error occurs, your code might have run completely, partially, or not at all. By default, Lambda retries an asynchronous execution twice. After exhausting the retries, Lambda can sideline the message to the DQL. For more information, see AWS Lambda Dead-Letter Queues.

When you have a fan-out architecture, with SQS queues and Lambda functions subscribed to an SNS topic, we recommend that you set DLQs to your SNS subscriptions, and to your destination queues and functions as well. This approach gives your application resilience against message delivery failures, message processing failures, and function execution failures too.

Applying DLQs in a use case

Here’s how everything comes together. The following diagram shows a serverless backend architecture that supports a car rental application. This is a durable serverless architecture based on DLQs for SNS, SQS, and Lambda.

Dead Letter Queue - DLQ SNS use case with architecture diagram

When a customer places an order to rent a car, the application sends that request to an API, which is powered by Amazon API Gateway. The REST API is backed by an SNS topic named Rental-Orders, and deployed onto an Amazon VPC subnet. The topic then fans out that order to the following two subscribed endpoints, for parallel processing:

  • An SQS queue, named Rental-Fulfilment, which feeds the integration with an internal fulfilment system hosted on Amazon EC2.
  • A Lambda function, named Rental-Billing, which processes and loads the customer order into a third-party billing system, also hosted on Amazon EC2.

To increase the durability of this serverless backend API, the following DLQs have been set up:

  • Two SNS DLQs, namely Rental-Fulfilment-Fanout-DLQ and Rental-Billing-Fanout-DLQ, which store the order in case either the subscribed SQS queue or Lambda function ever becomes unreachable.
  • An SQS DLQ, named Rental-Fulfilment-DLQ, which stores the order when the fulfilment system fails to process the order.
  • A Lambda DLQ, named Rental-Billing-DLQ, which stores the order when the function fails to process and load the order into the billing system.

When the DLQ captures the message, you can inspect the message for troubleshooting purposes. After you address the error at hand, you can poll the DLQ to retry the processing of the message.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or AWS CloudFormation. You can use the SDK, CLI, and API for polling the DLQs as well.

Configuring DLQs for subscriptions

You can attach a DLQ to an SNS subscription by setting the subscription’s RedrivePolicy parameter. The policy is a JSON object that refers to the DLQ ARN. The ARN must point to an SQS queue in the same AWS account as that of the SNS subscription. Also, both the DLQ and the subscription must be in the same AWS Region.

Here’s how you can configure one of the SNS DLQs applied in the car rental application example, presented earlier.

The following JSON object is a CloudFormation template that subscribes the SQS queue Rental-Fulfilment to the SNS topic Rental-Orders. The template also sets a RedrivePolicy that targets Rental-Fulfilment-Fanout-DLQ as a DLQ.

Lastly, the template sets a FilterPolicy value. It makes SNS deliver a message to the subscribed queue only if the published message carries an attribute named order-status with value set to either confirmed or canceled. As Amazon SNS Message Filtering happens before message delivery, messages that are filtered out aren’t sent to that subscription’s DLQ.

Internally, the CloudFormation template uses the SNS Subscribe API action for deploying the subscription and setting both policies, all part of the same API request.

{  
   "Resources": {
      "mySubscription": {
         "Type" : "AWS::SNS::Subscription",
         "Properties" : {
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
            "TopicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
            "RedrivePolicy": {
               "deadLetterTargetArn": 
                  "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"
            },
            "FilterPolicy": { 
               "order-status": [ "confirmed", "canceled" ]
            }
         }
      }
   }
}

Maybe the SNS topic and subscription are already deployed. In that case, you can use the SNS SetSubscriptionAttributes API action to set the RedrivePolicy, as shown by the following code examples, based on the AWS CLI and the AWS SDK for Java.

$ aws sns set-subscription-attributes 
   --region us-east-1
   --subscription-arn arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2
   --attribute-name RedrivePolicy 
   --attribute-value '{"deadLetterTargetArn":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ"}'
AmazonSNS sns = AmazonSNSClientBuilder.defaultClient();

String subscriptionArn = "arn:aws:sns:us-east-1:123456789012:Rental-Orders:44019880-ffa0-4067-9cb4-b974443bcck2";

String redrivePolicy = "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}";

SetSubscriptionAttributesRequest request = new SetSubscriptionAttributesRequest(
  subscriptionArn, 
  "RedrivePolicy", 
  redrivePolicy
);

sns.setSubscriptionAttributes(request);

Monitoring DLQs

You can use Amazon CloudWatch metrics and alarms to monitor the DLQs associated with your SNS subscriptions. In the car rental example, you can monitor the DLQs to be notified when the API failed to distribute any car rental order to the fulfillment or billing systems.

As regular SQS queues, the DLQs in SNS emit a number of metrics to CloudWatch, in 5-minute data points, such as NumberOfMessagesSent, NumberOfMessagesReceived and NumberOfMessagesDeleted. You can use these SQS metrics to be notified upon activity in your DLQs in SNS, so you may trigger a message recovery protocol.

You might have a case where you expect the DLQ to be always empty. In that case, create an CloudWatch alarm on NumberOfMessagesSent, set the alarm threshold to zero, and provide a separate SNS topic to be notified when the alarm goes off. The SNS topic, in its turn, can delivery your alarm notification to any endpoint type that you choose, such as email address, phone number, or mobile pager app.

Additionally, SNS itself provides its own set of metrics that are relevant to DLQs. Specifically, SNS metrics include the following:

  • NumberOfNotificationsRedrivenToDlq – Used when sending the message to the DLQ succeeds.
  • NumberOfNotificationsFailedToRedriveToDlq – Used when sending the message to the DLQ fails. This can happen because the DLQ either doesn’t exist anymore or doesn’t have the required access permissions to allow SNS to send messages to it. For more information about setting up the required access policy, see Giving Permissions for Amazon SNS to Send Messages to Amazon SQS.

Debugging with DLQs

Use CloudWatch Logs to see the exceptions that caused your SNS deliveries to fail and your messages to be sidelined to DLQs. In the car rental example, you can inspect the rental orders in the DLQs, as well as the logs associated with these queues. Then you can understand why those orders failed to be fanned out to the fulfilment or billing systems.

SNS can log both successful and failed deliveries in CloudWatch. You can enable Amazon SNS Delivery Status Logging by setting three SNS topic attributes, which are delivery protocol-specific. As an example, for SNS deliveries to SQS queues, you must set the following topic attributes: SQSSuccessFeedbackRoleArn,  SQSFailureFeedbackRoleArn, and SQSSuccessFeedbackSampleRate.

The following JSON object represents a successful SNS delivery in an CloudWatch Logs entry. The status code logged is 200 (SUCCESS). The attribute RedrivePolicy shows that the SNS subscription in question had its DLQ set.

{
  "notification": {
    "messageMD5Sum": "7bb3327ac55e49485bad42e159ca4d4b",
    "messageId": "e8c2bb09-235c-5f5d-b583-efd8df0f7d74",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:13:55.876"
  },
  "delivery": {
    "deliveryId": "6adf232e-fb12-5062-a564-27ff3741051f",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"sqsRequestId\":\"b2608a46-ccc4-51cc-003d-de972097debc\",\"sqsMessageId\":\"05fecd22-60a1-4d7d-bb79-026d49700b5a\"}",
    "dwellTimeMs": 58,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

The following JSON object represents a failed SNS delivery in CloudWatch Logs. In the following code example, the subscribed queue doesn’t exist. As a client error, the status code logged is 400 (FAILURE). Again, the RedrivePolicy attribute refers to a DLQ.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "9959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:51.116"
  },
  "delivery": {
    "deliveryId": "be743821-4c2c-5acc-a586-6cf0807f6fb1",
    "redrivePolicy": "{\"deadLetterTargetArn\": \"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ\"}",
    "destination": "arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment",
    "providerResponse": "{\"ErrorCode\":\"AWS.SimpleQueueService.NonExistentQueue\", \"ErrorMessage\":\"The specified queue does not exist or you do not have access to it.\",\"sqsRequestId\":\"Unrecoverable\"}",
    "dwellTimeMs": 53,
    "attempts": 1,
    "statusCode": 400
  },
  "status": "FAILURE"
}

When the message delivery fails and there is a DLQ attached to the subscription, the message is sent to the DLQ and an additional entry is logged in CloudWatch. This new entry is specific to the delivery to the DLQ and refers to the DLQ ARN as the destination, as shown in the following JSON object.

{
  "notification": {
    "messageMD5Sum": "81c395cbd350da6bedfe3b24db9517b0",
    "messageId": "8959db9d-25c8-57a6-9439-8e5be8f71a1f",
    "topicArn": "arn:aws:sns:us-east-1:123456789012:Rental-Orders",
    "timestamp": "2019-10-04 05:16:52.876"
  },
  "delivery": {
    "deliveryId": "a877c79f-a3ee-5105-9bbd-92596eae0232",
    "destination":"arn:aws:sqs:us-east-1:123456789012:Rental-Fulfilment-Fanout-DLQ",
    "providerResponse": "{\"sqsRequestId\":\"8cef1af5-e86a-519e-ad36-4f33252aa5ec\",\"sqsMessageId\":\"2b742c5c-0750-4ec5-a717-b95897adda8e\"}",
    "dwellTimeMs": 51,
    "attempts": 1,
    "statusCode": 200
  },
  "status": "SUCCESS"
}

By analyzing Amazon CloudWatch Logs entries, you can understand why an SNS message was moved to a DLQ, and then take the required set of steps to recover the message. When you enable delivery status logging in SNS, you can configure the sample rate in which deliveries are logged, from 0% to 100%.

Encrypting DLQs

When your SNS subscription targets an SQS encrypted queue, then you probably want your DLQ to be an SQS encrypted queue as well. This configuration provides consistency in the form that your messages are encrypted at rest.

To follow this security recommendation, give the CMK you used to encrypt your DLQ a key policy that grants the SNS service principal access to AWS KMS API actions. For example, see the following sample key policy:

{
    "Sid": "GrantSnsAccessToKms",
    "Effect": "Allow",
    "Principal": { "Service": "sns.amazonaws.com" },
    "Action": [ "kms:Decrypt", "kms:GenerateDataKey*" ],
    "Resource": "*"
}

If you have an SNS encrypted topic, but a subscription in this topic points to a DLQ that isn’t an SQS encrypted queue, then messages sidelined to the DLQ aren’t encrypted at rest.

For more information, see Enabling Server-Side Encryption (SSE) for an Amazon SNS Topic with an Amazon SQS Encrypted Queue Subscribed.

Summary

DLQs for SNS, SQS, and Lambda increase the resiliency and durability of your applications. These DLQs address different failure modes, and can be used together.

  • SNS DLQs store messages that failed to be delivered to subscribed endpoints.
  • SQS DLQs store messages that the consumer system failed to process.
  • Lambda DLQs store the messages that resulted in failed asynchronous executions of your functions.

Setting up DLQs for subscriptions, queues, and functions can be done using the AWS Management Console, SDK, CLI, API, or CloudFormation. DLQs are available in all AWS Regions. Start today by running the tutorials:

Simple Two-way Messaging using the Amazon SQS Temporary Queue Client

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/simple-two-way-messaging-using-the-amazon-sqs-temporary-queue-client/

This post is contributed by Robin Salkeld, Sr. Software Development Engineer

Amazon SQS is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. Asynchronous workflows have always been the primary use case for SQS. Using queues ensures one component can keep running smoothly without losing data when another component is unavailable or slow.

We were surprised, then, to discover that many customers use SQS in synchronous workflows. For example, many applications use queues to communicate between frontends and backends when processing a login request from a user.

Why would anyone use SQS for this? The service stores messages for up to 14 days with high durability, but messages in a synchronous workflow often must be processed within a few minutes, or even seconds. Why not just set up an HTTPS endpoint?

The more we talked to customers, the more we understood. Here’s what we learned:

  • Creating a queue is often easier and faster than creating an HTTPS endpoint and the infrastructure necessary to ensure the endpoint’s scalability.
  • Queues are safe by default because they are locked down to the AWS account that created them. In addition, any DDoS attempt on your service is absorbed by SQS instead of loading down your own servers.
  • There is generally no need to create firewall rules for the communication between microservices if they use queues.
  • Although SQS provides durable storage (which isn’t necessary for short-lived messages), it is still a cost-effective solution for this use case. This is especially true when you consider all the messaging broker maintenance that is done for you.

However, setting up efficient two-way communication through one-way queues requires some non-trivial client-side code. In our previous two-part post series on implementing enterprise integration patterns with AWS messaging services, Point-to-point channels and Publish-subscribe channels, we discussed the Request-Response Messaging Pattern. In this pattern, each requester creates a temporary destination to receive each response message.

The simplest approach is to create a new queue for each response, but this is like building a road just so a single car can drive on it before tearing it down. Technically, this can work (and SQS can create and delete queues quickly), but we can definitely make it faster and cheaper.

To better support short-lived, lightweight messaging destinations, we are pleased to present the Amazon SQS Temporary Queue Client. This client makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill.

Virtual queues

The key concept behind the client is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS and no costs associated with creating a virtual queue.

The Temporary Queue Client includes the AmazonSQSVirtualQueuesClient class for creating and managing virtual queues. This class implements the AmazonSQS interface and adds support for attributes related to virtual queues. You can create a virtual queue using this client by calling the CreateQueue API action and including the HostQueueURL queue attribute. This attribute specifies the existing SQS queue on which to host the virtual queue. The queue URL for a virtual queue is in the form <host queue URL>#<virtual queue name>. For example:

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue#MyVirtualQueueName

When you call the SendMessage or SendMessageBatch API actions on AmazonSQSVirtualQueuesClient with a virtual queue URL, the client first extracts the virtual queue name. It then attaches this name as an additional message attribute to each message, and sends the messages to the host queue. When you call the ReceiveMessage API action on a virtual queue, the calling thread waits for messages to appear in the in-memory buffer for the virtual queue. Meanwhile, a background thread polls the host queue and dispatches messages to these buffers according to the additional message attribute.

This mechanism is similar to how the AmazonSQSBufferedAsyncClient prefetches messages, and the benefits are similar. A single call to SQS can provide messages for up to 10 virtual queues, reducing the API calls that you pay for by up to a factor of ten. Deleting a virtual queue simply removes the client-side resources used to implement them, again without making API calls to SQS.

The diagram below illustrates the end-to-end process for sending messages through virtual queues:

Sending messages through virtual queues

Virtual queues are similar to virtual machines. Just as a virtual machine provides the same experience as a physical machine, a virtual queue divides the resources of a single SQS queue into smaller logical queues. This is ideal for temporary queues, since they frequently only receive a handful of messages in their lifetime. Virtual queues are currently implemented entirely within the Temporary Queue Client, but additional support and optimizations might be added to SQS itself in the future.

In most cases, you don’t have to manage virtual queues yourself. The library also includes the AmazonSQSTemporaryQueuesClient class. This class automatically creates virtual queues when the CreateQueue API action is called and creates host queues on demand for all queues with the same queue attributes. To optimize existing application code that creates and deletes queues, you can use this class as a drop-in replacement implementation of the AmazonSQS interface.

The client also includes the AmazonSQSRequester and AmazonSQSResponder interfaces, which enable two-way communication through SQS queues. The following is an example of an RPC implementation for a web application’s login process.

/**
 * This class handles a user's login request on the client side.
 */
public class LoginClient {

    // The SQS queue to send the requests to.
    private final String requestQueueUrl;

    // The AmazonSQSRequester creates a temporary queue for each response.
    private final AmazonSQSRequester sqsRequester = AmazonSQSRequesterClientBuilder.defaultClient();

    private final LoginClient(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Send a login request to the server.
     */
    public String login(String body) throws TimeoutException {
        SendMessageRequest request = new SendMessageRequest()
                .withMessageBody(body)
                .withQueueUrl(requestQueueUrl);

        // This:
        //  - creates a temporary queue
        //  - attaches its URL as an attribute on the message
        //  - sends the message
        //  - receives the response from the temporary queue
        //  - deletes the temporary queue
        //  - returns the response
        //
        // If something goes wrong and the server's response never shows up, this method throws a
        // TimeoutException.
        Message response = sqsRequester.sendMessageAndGetResponse(request, 20, TimeUnit.SECONDS);
        
        return response.getBody();
    }
}

/**
 * This class processes users' login requests on the server side.
 */
public class LoginServer {

    // The SQS queue to poll for login requests.
    // Assume that on construction a thread is created to poll this queue and call
    // handleLoginRequest() below for each message.
    private final String requestQueueUrl;

    // The AmazonSQSResponder sends responses to the correct response destination.
    private final AmazonSQSResponder sqsResponder = AmazonSQSResponderClientBuilder.defaultClient();

    private final AmazonSQS(String requestQueueUrl) {
        this.requestQueueUrl = requestQueueUrl;
    }

    /**
     * Handle a login request sent from the client above.
     */
    public void handleLoginRequest(Message message) {
        // Assume doLogin does the actual work, and returns a serialized result
        String response = doLogin(message.getBody());

        // This extracts the URL of the temporary queue from the message attribute and sends the
        // response to that queue.
        sqsResponder.sendResponseMessage(MessageContent.fromMessage(message), new MessageContent(response));  
    }
}

Cleaning up

As with any other AWS SDK client, your code should call the shutdown() method when the temporary queue client is no longer needed. The AmazonSQSRequester interface also provides a shutdown() method, which shuts down its internal temporary queue client. This ensures that the in-memory resources needed for any virtual queues are reclaimed, and that the host queue that the client automatically created is also deleted automatically.

However, in the world of distributed systems things are a little more complex. Processes can run out of memory and crash, and hosts can reboot suddenly and unexpectedly. There are even cases where you don’t have the opportunity to run custom code on shutdown.

The Temporary Queue Client client addresses this issue as well. For each host queue with recent API calls, the client periodically uses the TagQueue API action to attach a fresh tag value that indicates the queue is still being used. The tagging process serves as a heartbeat to keep the queue alive. According to a configurable time period (by default, 5 minutes), a background thread uses the ListQueues API action to obtain the URLs of all queues with the configured prefix. Then, it deletes each queue that has not been tagged recently. The mechanism is similar to how the Amazon DynamoDB Lock Client expires stale lock leases.

If you use the AmazonSQSTemporaryQueuesClient directly, you can customize how long queues must be idle before they is deleted by configuring the IdleQueueRetentionPeriodSeconds queue attribute. The client supports setting this attribute on both host queues and virtual queues. For virtual queues, setting this attribute ensures that the in-memory resources do not become a memory leak in the presence of application bugs.

Any API call to a queue marks it as non-idle, including ReceiveMessage calls that don’t return any messages. The only reason to increase the retention period attribute is to give the client more time when it can’t send heartbeats—for example, due to garbage collection pauses or networking issues.

But what if you want to use this client in a fleet of a thousand EC2 instances? Won’t every client spend a lot of time checking every queue for idleness? Doesn’t that imply duplicate work that increases as the fleet is scaled up?

We thought of this too. The Temporary Queue Client creates a shared queue for all clients using the same queue prefix, and uses this queue as a work queue for the distributed task. Instead of every client calling the ListQueues API action every five minutes, a new seed message (which triggers the sweeping process) is sent to this queue every five minutes.

When one of the clients receives this message, it calls the ListQueues API action and sends each queue URL in the result as another kind of message to the same shared work queue. The work of actually checking each queue for idleness is distributed roughly evenly to the active clients, ensuring scalability. There is even a mechanism that works around the fact that the ListQueues API action currently only returns no more than 1,000 queue URLs at time.

Summary

We are excited about how the new Amazon SQS Temporary Queue Client makes more messaging patterns easier and cheaper for you. Download the code from GitHub, have a look at Temporary Queues in the Amazon SQS Developer Guide, try out the client, and let us know what you think!

ICYMI: Serverless Q4 2018

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2018/

This post is courtesy of Eric Johnson, Senior Developer Advocate – AWS Serverless

Welcome to the fourth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

This edition of ICYMI includes all announcements from AWS re:Invent 2018!

If you didn’t see them, check our Q1 ICYMIQ2 ICYMI, and Q3 ICYMI posts for what happened then.

So, what might you have missed this past quarter? Here’s the recap.

New features

AWS Lambda introduced the Lambda runtime API and Lambda layers, which enable developers to bring their own runtime and share common code across Lambda functions. With the release of the runtime API, we can now support runtimes from AWS partners such as the PHP runtime from Stackery and the Erlang and Elixir runtimes from Alert Logic. Using layers, partners such as Datadog and Twistlock have also simplified the process of using their Lambda code libraries.

To meet the demand of larger Lambda payloads, Lambda doubled the payload size of asynchronous calls to 256 KB.

In early October, Lambda also lengthened the runtime limit by enabling Lambda functions that can run up to 15 minutes.

Lambda also rolled out native support for Ruby 2.5 and Python 3.7.

You can now process Amazon Kinesis streams up to 68% faster with AWS Lambda support for Kinesis Data Streams enhanced fan-out and HTTP/2 for faster streaming.

Lambda also released a new Application view in the console. It’s a high-level view of all of the resources in your application. It also gives you a quick view of deployment status with the ability to view service metrics and custom dashboards.

Application Load Balancers added support for targeting Lambda functions. ALBs can provide a simple HTTP/S front end to Lambda functions. ALB features such as host- and path-based routing are supported to allow flexibility in triggering Lambda functions.

Amazon API Gateway added support for AWS WAF. You can use AWS WAF for your Amazon API Gateway APIs to protect from attacks such as SQL injection and cross-site scripting (XSS).

API Gateway has also improved parameter support by adding support for multi-value parameters. You can now pass multiple values for the same key in the header and query string when calling the API. Returning multiple headers with the same name in the API response is also supported. For example, you can send multiple Set-Cookie headers.

In October, API Gateway relaunched the Serverless Developer Portal. It provides a catalog of published APIs and associated documentation that enable self-service discovery and onboarding. You can customize it for branding through either custom domain names or logo/styling updates. In November, we made it easier to launch the developer portal from the Serverless Application Repository.

In the continuous effort to decrease customer costs, API Gateway introduced tiered pricing. The tiered pricing model allows the cost of API Gateway at scale with an API Requests price as low as $1.51 per million requests at the highest tier.

Last but definitely not least, API Gateway released support for WebSocket APIs in mid-December as a final holiday gift. With this new feature, developers can build bidirectional communication applications without having to provision and manage any servers. This has been a long-awaited and highly anticipated announcement for the serverless community.

AWS Step Functions added eight new service integrations. With this release, the steps of your workflow can exist on Amazon ECS, AWS Fargate, Amazon DynamoDB, Amazon SNS, Amazon SQS, AWS Batch, AWS Glue, and Amazon SageMaker. This is in addition to the services that Step Functions already supports: AWS Lambda and Amazon EC2.

Step Functions expanded by announcing availability in the EU (Paris) and South America (São Paulo) Regions.

The AWS Serverless Application Repository increased its functionality by supporting more resources in the repository. The Serverless Application Repository now supports Application Auto Scaling, Amazon Athena, AWS AppSync, AWS Certificate Manager, Amazon CloudFront, AWS CodeBuild, AWS CodePipeline, AWS Glue, AWS dentity and Access Management, Amazon SNS, Amazon SQS, AWS Systems Manager, and AWS Step Functions.

The Serverless Application Repository also released support for nested applications. Nested applications enable you to build highly sophisticated serverless architectures by reusing services that are independently authored and maintained but easily composed using AWS SAM and the Serverless Application Repository.

AWS SAM made authorization simpler by introducing SAM support for authorizers. Enabling authorization for your APIs is as simple as defining an Amazon Cognito user pool or an API Gateway Lambda authorizer as a property of your API in your SAM template.

AWS SAM CLI introduced two new commands. First, you can now build locally with the sam build command. This functionality allows you to compile deployment artifacts for Lambda functions written in Python. Second, the sam publish command allows you to publish your SAM application to the Serverless Application Repository.

Our SAM tooling team also released the AWS Toolkit for PyCharm, which provides an integrated experience for developing serverless applications in Python.

The AWS Toolkits for Visual Studio Code (Developer Preview) and IntelliJ (Developer Preview) are still in active development and will include similar features when they become generally available.

AWS SAM and the AWS SAM CLI implemented support for Lambda layers. Using a SAM template, you can manage your layers, and using the AWS SAM CLI, you can develop and debug Lambda functions that are dependent on layers.

Amazon DynamoDB added support for transactions, allowing developer to enforce all-or-nothing operations. In addition to transaction support, Amazon DynamoDB Accelerator also added support for DynamoDB transactions.

Amazon DynamoDB also announced Amazon DynamoDB on-demand, a flexible new billing option for DynamoDB capable of serving thousands of requests per second without capacity planning. DynamoDB on-demand offers simple pay-per-request pricing for read and write requests so that you only pay for what you use, making it easy to balance costs and performance.

AWS Amplify released the Amplify Console, which is a continuous deployment and hosting service for modern web applications with serverless backends. Modern web applications include single-page app frameworks such as React, Angular, and Vue and static-site generators such as Jekyll, Hugo, and Gatsby.

Amazon SQS announced support for Amazon VPC Endpoints using PrivateLink.

Serverless blogs

October

November

December

Tech talks

We hold several Serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. Here are the three tech talks that we delivered in Q4:

Twitch

We’ve been so busy livestreaming on Twitch that you’re most certainly missing out if you aren’t following along!

For information about upcoming broadcasts and recent livestreams, keep an eye on AWS on Twitch for more Serverless videos and on the Join us on Twitch AWS page.

New Home for SAM Docs

This quarter, we moved all SAM docs to https://docs.aws.amazon.com/serverless-application-model. Everything you need to know about SAM is there. If you don’t find what you’re looking for, let us know!

In other news

The schedule is out for 2019 AWS Global Summits in cities around the world. AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Summits are held in major cities around the world. They attract technologists from all industries and skill levels who want to discover how AWS can help them innovate quickly and deliver flexible, reliable solutions at scale. Get notified when to register and learn more at the AWS Global Summits website.

Still looking for more?

The Serverless landing page has lots of information. The resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. Check it out!

Implementing enterprise integration patterns with AWS messaging services: point-to-point channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-point-to-point-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

At AWS, we see our customers increasingly moving toward managed services to reduce the time and money that they spend managing infrastructure. This also applies to the messaging domain, where AWS provides a collection of managed services.

Asynchronous messaging is a fundamental approach for integrating independent systems or building up a set of loosely coupled systems that can scale and evolve independently and flexibly. The well-known collection of enterprise integration patterns (EIPs) provides a “technology-independent vocabulary” to “design and document integration solutions.” This blog is the first of two that describes how you can implement the core EIPs using AWS messaging services. Let’s first look at the relevant AWS messaging services.

When organizations migrate their traditional messaging and existing applications to the cloud gradually, they usually want to do it without rewriting their code. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud. It supports industry-standard APIs and protocols such as JMS, AMQP, and MQTT, so you can switch from any standards-based message broker to Amazon MQ without rewriting the messaging code in your applications. Amazon MQ is recommended if you’re using messaging with existing applications and want to move your messaging to the cloud without rewriting existing code.

However, if you build new applications for the cloud, we recommend that you consider using cloud-native messaging services such as Amazon SQS and Amazon SNS. These serverless, fully managed message queue and topic services scale to meet your demands and provide simple, easy-to-use APIs. You can use Amazon SQS and Amazon SNS to decouple and scale microservices, distributed systems, and serverless applications and improve overall reliability.

This blog looks at the first part of some fundamental integration patterns. We describe the patterns and apply them to these AWS messaging services. This will help you apply the right pattern to your use case and architect for scale in a secure and cost-efficient manner. For all variants, we employ both traditional and cloud-native messaging services: Amazon MQ for the former and Amazon SQS and Amazon SNS for the latter.

Integration Patterns

Let’s start with some fundamental integration patterns.

Message exchange patterns

First, we inspect the two major message exchange patterns: one-way and request-response.

One-way messaging

Applying one-way messaging, a message producer (sender) sends out a message to a messaging channel and doesn’t expect or want a response from whatever process (receiver) consumed the message. Examples of one-way messaging include a data transfer and a notification about an event that happened.

Request-response messaging

With request-response messaging, a message producer (requester) sends out a message: for example, a command to instruct the responder to execute something. The requester expects a response from each message consumer (responder) who received that message, likely to know what the result of all executions was. To know where to send the response message to, the request message contains a return address that the responder uses. To make sure that the requester can assign an incoming response to a request, the requester adds a correlation identifier to the request, which the responders echo in their responses.

Messaging channels: point-to-point

Next, we look at the point-to-point messaging channel, one of the most important patterns for messaging channels. We will continue our consideration with publish-subscribe in our second post.

A point-to-point channel is usually implemented by message queues. Message queues operate so that any given message is only consumed by one receiver, although multiple receivers can be connected to the queue. The queue ensures once-only consumption. Messages are usually buffered in queues so that they’re available for consumption for a certain amount of time, even if no receiver is currently connected.

Point-to-point channels are often used for loosely coupled message transmission, though there are two other common uses. First, it can support horizontal scaling of message processing on the receiver side. Depending on the message load in the channel, the number of receiver processes can be elastically adjusted to cope with the load as needed. The queue acts as a buffering load balancer. Second, it can flatten peak loads of messages and prevent your receivers from being flooded when you can’t scale out fast enough or you don’t want additional scaling.

Integration scenarios

In this section, we apply these fundamental patterns to AWS messaging services. The code examples are written in Java, but only by author preference. You can implement the same integration scenarios in C++, .NET, Node.js, Python, Ruby, Go, and other programming languages that AWS provides an SDK and an Apache Active MQ client library is available for.

Point-to-point channels: one-way messaging

The diagrams in the following subsections show the principle of one-way messaging for point-to-point channels, using Amazon MQ queues and Amazon SQS queues. The sender produces a message and sends it into a queue, and the receiver consumes the message from the queue for processing. For traditional messaging (that is, Amazon MQ), the senders and consumers can use protocols such as JMS or AMQP. For cloud-native messaging, they can use the Amazon SQS API.

Traditional messaging

To follow this example, open the Amazon MQ console and create a broker. In the following diagram we see the above explained components for the traditional messaging scenario: A sender sends messages into an Amazon MQ queue, a receiver consumes messages from that queue.

Point to point traditional messaging

In the following code example, sender and receiver are using the Apache Active MQ client library and the standard Java messaging service (JMS) API to send and receive messages to and from an Amazon MQ queue. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer. For simplicity, the code launches sender and receiver in the same Java virtual machine (JVM).

public class PointToPointOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointOneWayTraditional");
        conn.start();

        new Thread(new Receiver(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
        new Thread(new Sender(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.OneWay.Traditional")).start();
    }

    public static class Sender implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Receiver implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow this example, open the Amazon SQS console and create a standard SQS queue, using the queue name P2POneWayCloudNative.  In the following diagram we see the above explained components for the cloud-native messaging scenario: A sender sends messages into an Amazon SQS queue, a receiver consumes messages from that queue.

Point to point cloud-native messaging

 

In the sample code below, the example sender is using the AWS SDK for Java to send messages to an Amazon SQS queue, running in an endless loop. You can run the code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PointToPointOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Sender(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2POneWayCloudNative")).start();
    }

    public static class Sender implements Runnable {

        private AmazonSQS sqs;
        private String destination;

        public Sender(AmazonSQS sqs, String destination) {
            this.sqs = sqs;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sqs.sendMessage(
                    new SendMessageRequest()
                        .withQueueUrl(destination)
                        .withMessageBody("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

We implement the receiver below in a serverless manner as an AWS Lambda function, using Amazon SQS as the event source. The name of the SQS queue is configured outside the function’s code, which is why it doesn’t appear in this code example.

public class Receiver implements RequestHandler<SQSEvent, Void> {

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageAttributes().get("MessageID").getStringValue()));
        }

        return null;
    }
}

If this approach is new to you, you can find more details in AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources. Using Lambda comes with a number of benefits. For example, you don’t have to manage the compute environment for the receiver, and you can use an event (or push) model instead of having to poll for new messages.

Point-to-point channels: request-response messaging

In addition to the one-way scenario, we have a return channel option. We would now call the involved processes rather than the requester and responder. The requester sends a message into the request queue, and the responder sends the response into the response queue. Remember that the requester enriches the message with a return address (the name of the response queue) so that the responder knows where to send the response to. The requester also sends a correlation ID that the responder copies into the response message so that the requester can match the incoming response with a request.

Traditional messaging

In this example, we reuse the Amazon MQ broker that we set up earlier. In the following diagram we see the above explained components for the traditional messaging scenario, using an Amazon MQ queue each for the request messages and for the response messages.

Point to point request response traditional messaging

Using Amazon MQ, we don’t have to create queues explicitly because they’re implicitly created as needed when we start sending messages to them. This example is similar to the point-to-point one-way traditional example.

public class PointToPointRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PointToPointRequestResponseTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Queue.PointToPoint.RequestResponse.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage = consumer.receive(5000);
                        System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) receivedMessage).getText(), receivedMessage.getJMSMessageID()));
                        receivedMessage.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Responder(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createConsumer(session.createQueue(destination));
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " with CorrelationID " + correlationId);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

Open the Amazon SQS console and create two standard SQS queues using the queue names P2PReqRespCloudNative and P2PReqRespCloudNative-Resp. In the following diagram we see the above explained components for the cloud-native scenario, using an Amazon SQS queue each for the request messages and for the response messages.

Point to point request response cloud native messaging

The following example requester is almost identical to the point-to-point one-way cloud-native example sender. It also provides a reply-to address and a correlation ID.

public class PointToPointRequestResponseCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sqs, "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/P2PReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, SendMessageRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSQS sqs, String destination, String replyDestination) {
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                SendMessageRequest request = new SendMessageRequest()
                    .withQueueUrl(destination)
                    .withMessageBody("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sqs.sendMessage(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    SendMessageRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessageBody()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

The following example responder is almost identical to the point-to-point one-way cloud-native example receiver. It also creates a message and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SQSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SQSEvent request, Context context) {
        for (SQSEvent.SQSMessage message: request.getRecords()) {
            System.out.println(String.format("received message '%s' with message id '%s'", message.getBody(), message.getMessageId()));
            String correlationId = message.getMessageAttributes().get("CorrelationID").getStringValue();
            String replyTo = message.getMessageAttributes().get("ReplyTo").getStringValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(message.getBody() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional resources

What’s next?

We have introduced the first fundamental EIPs and shown how you can apply them to the AWS messaging services. If you are keen to dive deeper, continue reading with the second part of this series, where we will cover publish-subscribe messaging.

Read Part 2: Publish-Subscribe Messaging

Implementing enterprise integration patterns with AWS messaging services: publish-subscribe channels

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/implementing-enterprise-integration-patterns-with-aws-messaging-services-publish-subscribe-channels/

This post is courtesy of Christian Mueller, Sr. Solutions Architect, AWS and Dirk Fröhner, Sr. Solutions Architect, AWS

In this blog, we look at the second part of some fundamental enterprise integration patterns and how you can implement them with AWS messaging services. If you missed the first part, we encourage you to start there.

Read Part 1: Point-to-Point Messaging

Integration patterns

Messaging channels: publish-subscribe

As mentioned in the first blog, we continue with the second major messaging channel pattern: publish-subscribe.

A publish-subscribe channel is usually implemented using message topics. In this model, any message published to a topic is immediately received by all of the subscribers of the topic (unless you have applied the message filter pattern). However, if there is no subscriber, messages are usually discarded. The durable subscriber pattern describes an exception where messages are kept for a while in case the subscriber is offline. Publish-subscribe is used when multiple parties are interested in certain messages. Sometimes, this pattern is also referred to as fan-out.

Let’s apply this pattern to the different AWS messaging services and get our hands dirty. To follow our examples, sign in to your AWS account (or create an account as described in How do I create and activate a new Amazon Web Services account?).

Integration scenarios

Publish-subscribe channels: one-way messaging

Publish-subscribe one-way patterns are often involved in notification style use cases, where the publisher sends out an event and doesn’t care who is interested in this event. For example, Amazon CloudWatch Events publishes state changes in the environment, and you can subscribe and act accordingly.

The diagrams in the following subsections show the principles of one-way messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

For traditional messaging, senders and consumers can use API protocols such JMS or AMQP. For cloud-native messaging, they can use the Amazon SNS API.

Traditional messaging

In this example, we reuse the Amazon MQ broker we set up in part one of this blog. As we can see in the following diagram, messages as published into an Amazon MQ topic and multiple subscribers can consume messages from it.

Publish Subscribe One Way Traditional Messaging

This example is similar to the point-to-point one-way traditional example using the Apache Active MQ client library, but we use topics instead of queues, as shown in the following code.

public class PublishSubscribeOneWayTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubOneWayTraditional");
        conn.start();

        new Thread(new Subscriber(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
        new Thread(new Publisher(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.OneWay.Traditional")).start();
    }

    public static class Publisher implements Runnable {

        private Session session;
        private String destination;

        public Sender(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageProducer messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    messageProducer.send(message);
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Subscriber implements Runnable, MessageListener {

        private Session session;
        private String destination;

        public Receiver(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), "subscriber-1");
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                System.out.println(String.format("received message '%s' with message id '%s'", ((TextMessage) message).getText(), message.getJMSMessageID()));
                message.acknowledge();
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To follow a similar example using Amazon SNS, open the Amazon SNS console and create an Amazon SNS topic named PubSubOneWayCloudNative. The below diagram illustrates that a publisher sends messages into an Amazon SNS topic which are consumed by subscribers of this topic.

Publish Subscribe One Way Cloud Native Messaging

We use the AWS SDK for Java to send messages to our Amazon SNS topic, running in an endless loop. You can run the following code on every Amazon compute service, your on-premises data center, or your personal computer.

public class PublishSubscribeOneWayCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();

        new Thread(new Publisher(sns, "arn:aws:sns:<region>:<account-number>:PubSubOneWayCloudNative")).start();
    }

    public static class Publisher implements Runnable {

        private AmazonSNS sns;
        private String destination;

        public Sender(AmazonSNS sns, String destination) {
            this.sns = sns;
            this.destination = destination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                sns.publish(
                    new PublishRequest()
                        .withTargetArn(destination)
                        .withSubject("PubSubOneWayCloudNative sample")
                        .withMessage("Message " + ++counter)
                        .addMessageAttributesEntry("MessageID", new MessageAttributeValue().withDataType("String").withStringValue(UUID.randomUUID().toString())));
            }
        }
    }
}

The subscriber is implemented as an AWS Lambda function, using Amazon SNS as the event source. For more information on how to set this up, see Using Amazon SNS for System-to-System Messaging with a Lambda Function as a Subscriber.

public class Subscriber implements RequestHandler<SNSEvent, Void> {

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            SNS sns = record.getSNS();

            System.out.println(String.format("received message '%s' with message id '%s'", sns.getMessage(), sns.getMessageAttributes().get("MessageID").getValue()));
        }

        return null;
    }
}

Publish-subscribe channels: request-response messaging

Publish-subscribe request-response patterns are beneficial in use cases where it’s important to communicate with multiple services that do their work in parallel, but all their responses need to be aggregated afterward. One example is an order service, which needs to enrich the order message with data from multiple backend services.

The diagrams in the following subsections show the principles of request-response messaging for publish-subscribe channels, using both Amazon MQ and Amazon SNS topics. A publisher produces a message and sends it into a topic, and subscribers consume the message from the topic for processing.

Although we use a publish-subscribe channel for the request messages, we would usually use a point-to-point channel for the response messages. This assumes that the requester application or at least a dedicated application is the one entity that works on processing all the responses.

Traditional messaging

As we can see in the following diagram, a Amazon MQ topic is used to send out all the request messages, while all the response messages are sent into an Amazon MQ queue.

Publish Subscribe Request Response Traditional Messaging

In our code sample below, we use two responders.

public class PublishSubscribeRequestResponseTraditional {

    public static void main(String... args) throws Exception {
        ActiveMQSslConnectionFactory connFact = new ActiveMQSslConnectionFactory("failover:(ssl://<broker-1>.amazonaws.com:61617,ssl://<broker-2>.amazonaws.com:61617)");
        connFact.setConnectResponseTimeout(10000);
        Connection conn = connFact.createConnection("user", "password");
        conn.setClientID("PubSubReqRespTraditional");
        conn.start();

        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-1")).start();
        new Thread(new Responder(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional", "subscriber-2")).start();
        new Thread(new Requester(conn.createSession(false, Session.CLIENT_ACKNOWLEDGE), "Topic.PubSub.ReqResp.Traditional")).start();
    }

    public static class Requester implements Runnable {

        private Session session;
        private String destination;

        public Requester(Session session, String destination) {
            this.session = session;
            this.destination = destination;
        }

        public void run() {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createTopic(destination));
                long counter = 0;

                while (true) {
                    TemporaryQueue replyTo = session.createTemporaryQueue();
                    String correlationId = UUID.randomUUID().toString();
                    TextMessage message = session.createTextMessage("Message " + ++counter);
                    message.setJMSMessageID(UUID.randomUUID().toString());
                    message.setJMSCorrelationID(correlationId);
                    message.setJMSReplyTo(replyTo);
                    messageProducer.send(message);

                    MessageConsumer consumer = session.createConsumer(replyTo, "JMSCorrelationID='" + correlationId + "'");
                    try {
                        Message receivedMessage1 = consumer.receive(5000);
                        Message receivedMessage2 = consumer.receive(5000);
                        System.out.println(String.format("received 2 messages '%s' and '%s'", ((TextMessage) receivedMessage1).getText(), ((TextMessage) receivedMessage2).getText()));
                        receivedMessage2.acknowledge();
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class Responder implements Runnable, MessageListener {

        private Session session;
        private String destination;
        private String name;

        public Responder(Session session, String destination, String name) {
            this.session = session;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            try {
                MessageConsumer consumer = session.createDurableSubscriber(session.createTopic(destination), name);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            try {
                String correlationId = message.getJMSCorrelationID();
                Destination replyTo = message.getJMSReplyTo();

                TextMessage responseMessage = session.createTextMessage(((TextMessage) message).getText() + " from responder " + name);
                responseMessage.setJMSMessageID(UUID.randomUUID().toString());
                responseMessage.setJMSCorrelationID(correlationId);

                MessageProducer messageProducer = session.createProducer(replyTo);
                try {
                    messageProducer.send(responseMessage);

                    message.acknowledge();
                } finally {
                    if (messageProducer != null) {
                        messageProducer.close();
                    }
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Cloud-native messaging

To implement a similar pattern with Amazon SNS, open the Amazon SNS console and create a new SNS topic named PubSubReqRespCloudNative. Then open the Amazon SQS console and create a standard SQS queue named PubSubReqRespCloudNative-Resp. The following diagram illustrates that we now use an Amazon SNS topic for request messages and an Amazon SQS queue for response messages.

Publish Subscribe Request Response Cloud Native Messaging

This example requester is almost identical to the publish-subscribe one-way cloud-native example sender. The requester also specifies a reply-to address and a correlation ID as message attributes. This way, responders know where to send the responses to, and the receiver of the responses can assign them accordingly.

public class PublishSubscribeReqRespCloudNative {

    public static void main(String... args) throws Exception {
        final AmazonSNS sns = AmazonSNSClientBuilder.standard().build();
        final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

        new Thread(new Requester(sns, sqs, "arn:aws:sns:<region>:<account-number>:PubSubReqRespCloudNative", "https://sqs.<region>.amazonaws.com/<account-number>/PubSubReqRespCloudNative-Resp")).start();
    }

    public static class Requester implements Runnable {

        private AmazonSNS sns;
        private AmazonSQS sqs;
        private String destination;
        private String replyDestination;
        private Map<String, PublishRequest> inflightMessages = new ConcurrentHashMap<>();

        public Requester(AmazonSNS sns, AmazonSQS sqs, String destination, String replyDestination) {
            this.sns = sns;
            this.sqs = sqs;
            this.destination = destination;
            this.replyDestination = replyDestination;
        }

        public void run() {
            long counter = 0;

            while (true) {
                String correlationId = UUID.randomUUID().toString();
                PublishRequest request = new PublishRequest()
                    .withTopicArn(destination)
                    .withMessage("Message " + ++counter)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId))
                    .addMessageAttributesEntry("ReplyTo", new MessageAttributeValue().withDataType("String").withStringValue(replyDestination));
                sns.publish(request);

                inflightMessages.put(correlationId, request);

                ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest()
                        .withQueueUrl(replyDestination)
                        .withMessageAttributeNames("CorrelationID")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));

                for (Message receivedMessage : receiveMessageResult.getMessages()) {
                    System.out.println(String.format("received message '%s' with message id '%s'", receivedMessage.getBody(), receivedMessage.getMessageId()));

                    String receivedCorrelationId = receivedMessage.getMessageAttributes().get("CorrelationID").getStringValue();
                    PublishRequest originalRequest = inflightMessages.remove(receivedCorrelationId);
                    System.out.println(String.format("Corresponding request message '%s'", originalRequest.getMessage()));

                    sqs.deleteMessage(
                        new DeleteMessageRequest()
                            .withQueueUrl(replyDestination)
                            .withReceiptHandle(receivedMessage.getReceiptHandle()));
                }
            }
        }
    }
}

This example responder is almost identical to the publish-subscribe one-way cloud-native example receiver. It also creates a message, enriches it with the correlation ID, and sends it back to the reply-to address provided in the received message.

public class Responder implements RequestHandler<SNSEvent, Void> {

    private final AmazonSQS sqs = AmazonSQSClientBuilder.standard().build();

    @Override
    public Void handleRequest(SNSEvent request, Context context) {
        for (SNSEvent.SNSRecord record: request.getRecords()) {
            System.out.println(String.format("received record '%s' with message id '%s'", record.getSNS().getMessage(), record.getSNS().getMessageId()));
            String correlationId = record.getSNS().getMessageAttributes().get("CorrelationID").getValue();
            String replyTo = record.getSNS().getMessageAttributes().get("ReplyTo").getValue();

            System.out.println(String.format("sending message with correlation id '%s' to '%s'", correlationId, replyTo));
            sqs.sendMessage(
                new SendMessageRequest()
                    .withQueueUrl(replyTo)
                    .withMessageBody(record.getSNS().getMessage() + " with CorrelationID " + correlationId)
                    .addMessageAttributesEntry("CorrelationID", new MessageAttributeValue().withDataType("String").withStringValue(correlationId)));
        }

        return null;
    }
}

Go Build!

We look forward to hearing about what you build and will continue innovating our services on your behalf.

Additional Resources

Monitoring your Amazon SNS message filtering activity with Amazon CloudWatch

Post Syndicated from Rachel Richardson original https://aws.amazon.com/blogs/compute/monitoring-your-amazon-sns-message-filtering-activity-with-amazon-cloudwatch/

This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.

Amazon SNS message filtering provides a set of string and numeric matching operators that allow each subscription to receive only the messages of interest. Hence, SNS message filtering can simplify your pub/sub messaging architecture by offloading the message filtering logic from your subscriber systems, as well as the message routing logic from your publisher systems.

After you set the subscription attribute that defines a filter policy, the subscribing endpoint receives only the messages that carry attributes matching this filter policy. Other messages published to the topic are filtered out for this subscription. In this way, the native integration between SNS and Amazon CloudWatch provides visibility into the number of messages delivered, as well as the number of messages filtered out.

CloudWatch metrics are captured automatically for you. To get started with SNS message filtering, see Filtering Messages with Amazon SNS.

Message Filtering Metrics

The following six CloudWatch metrics are relevant to understanding your SNS message filtering activity:

  • NumberOfMessagesPublished – Inbound traffic to SNS. This metric tracks all the messages that have been published to the topic.
  • NumberOfNotificationsDelivered – Outbound traffic from SNS. This metric tracks all the messages that have been successfully delivered to endpoints subscribed to the topic. A delivery takes place either when the incoming message attributes match a subscription filter policy, or when the subscription has no filter policy at all, which results in a catch-all behavior.
  • NumberOfNotificationsFilteredOut – This metric tracks all the messages that were filtered out because they carried attributes that didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-NoMessageAttributes – This metric tracks all the messages that were filtered out because they didn’t carry any attributes at all and, consequently, didn’t match the subscription filter policy.
  • NumberOfNotificationsFilteredOut-InvalidAttributes – This metric keeps track of messages that were filtered out because they carried invalid or malformed attributes and, thus, didn’t match the subscription filter policy.
  • NumberOfNotificationsFailed – This last metric tracks all the messages that failed to be delivered to subscribing endpoints, regardless of whether a filter policy had been set for the endpoint. This metric is emitted after the message delivery retry policy is exhausted, and SNS stops attempting to deliver the message. At that moment, the subscribing endpoint is likely no longer reachable. For example, the subscribing SQS queue or Lambda function has been deleted by its owner. You may want to closely monitor this metric to address message delivery issues quickly.

Message filtering graphs

Through the AWS Management Console, you can compose graphs to display your SNS message filtering activity. The graph shows the number of messages published, delivered, and filtered out within the timeframe you specify (1h, 3h, 12h, 1d, 3d, 1w, or custom).

SNS message filtering for CloudWatch Metrics

To compose an SNS message filtering graph with CloudWatch:

  1. Open the CloudWatch console.
  2. Choose Metrics, SNS, All Metrics, and Topic Metrics.
  3. Select all metrics to add to the graph, such as:
    • NumberOfMessagesPublished
    • NumberOfNotificationsDelivered
    • NumberOfNotificationsFilteredOut
  4. Choose Graphed metrics.
  5. In the Statistic column, switch from Average to Sum.
  6. Title your graph with a descriptive name, such as “SNS Message Filtering”

After you have your graph set up, you may want to copy the graph link for bookmarking, emailing, or sharing with co-workers. You may also want to add your graph to a CloudWatch dashboard for easy access in the future. Both actions are available to you on the Actions menu, which is found above the graph.

Summary

SNS message filtering defines how SNS topics behave in terms of message delivery. By using CloudWatch metrics, you gain visibility into the number of messages published, delivered, and filtered out. This enables you to validate the operation of filter policies and more easily troubleshoot during development phases.

SNS message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). CloudWatch metrics for SNS message filtering is available now, in all AWS Regions.

For information about pricing, see the CloudWatch pricing page.

For more information, see:

Solving Complex Ordering Challenges with Amazon SQS FIFO Queues

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/solving-complex-ordering-challenges-with-amazon-sqs-fifo-queues/

Contributed by Shea Lutton, AWS Cloud Infrastructure Architect

Amazon Simple Queue Service (Amazon SQS) is a fully managed queuing service that helps decouple applications, distributed systems, and microservices to increase fault tolerance. SQS queues come in two distinct types:

  • Standard SQS queues are able to scale to enormous throughput with at-least-once delivery.
  • FIFO queues are designed to guarantee that messages are processed exactly once in the exact order that they are received and have a default rate of 300 transactions per second.

As customers explore SQS FIFO queues, they often have questions about how the behavior works when messages arrive and are consumed. This post walks through some common situations to identify the exact behavior that you can expect. It also covers the behavior of message groups in depth and explains why message groups are key to understanding how FIFO queues work.

The simple case

Suppose that you run a major auction platform where people buy and sell a wide range of products. Your platform requires that transactions from buyers and sellers get processed in exactly the order received. Here’s how a FIFO queue helps you keep all your transactions in one straight flow.

A seller currently is holding an auction for a laptop, and three different bids are received for the same price. Ties are awarded to the first bidder at that price so it is important to track which arrived first. Your auction platform receives the three bids and sends them to a FIFO queue before they are processed.

Now observe how messages leave the queue. When your consumer asks for a batch of up to 10 messages, SQS starts filling the batch with the oldest message (bid A1). It keeps filling until either the batch is full or the queue is empty. In this case, the batch contains the three messages and the queue is now empty. After a batch has left the queue, SQS considers that batch of messages to be “in-flight” until the consumer either deletes them or the batch’s visibility timer expires.

 

When you have a single consumer, this is easy to envision. The consumer gets a batch of messages (now in-flight), does its processing, and deletes the messages. That consumer is then ready to ask for the next batch of messages.

The critical thing to keep in mind is that SQS won’t release the next batch of messages until the first batch has been deleted. By adding more messages to the queue, you can see more interesting behaviors. Imagine that a burst of 11 bids is sent to your FIFO queue, with two bids for Auction A arriving last.

The FIFO queue now has at least two batches of messages in it. When your single consumer requests the first batch of 10 messages, it receives a batch starting with B1 and ending with A1. Later, after the first batch has been deleted, the consumer can get the second batch of messages containing the final A2 message from the queue.

Adding complexity with multiple message groups

A new challenge arises. Your auction platform is getting busier and your dev team added a number of new features. The combination of increased messages and extra processing time for the new features means that a single consumer is too slow. The solution is to scale to have more consumers and process messages in parallel.

To work in parallel, your team realized that only the messages related to a single auction must be kept in order. All transactions for Auction A need to be kept in order and so do all transactions for Auction B. But the two auctions are independent and it does not matter which auctions transactions are processed first.

FIFO can handle that case with a feature called message groups. Each transaction related to Auction A is placed by your producer into message group A, and so on. In the diagram below, Auction A and Auction B each received three bid transactions, with bid B1 arriving first. The FIFO queue always keeps transactions within a message group in the order in which they arrived.

How is this any different than earlier examples? The consumer now gets the messages ordered by message groups, all the B group messages followed by all the A group messages. Multiple message groups create the possibility of using multiple consumers, which I explain in a moment. If FIFO can’t fill up a batch of messages with a single message group, FIFO can place more than one message group in a batch of messages. But whenever possible, the queue gives you a full batch of messages from the same group.

The order of messages leaving a FIFO queue is governed by three rules:

  1. Return the oldest message where no other message in the same message group is currently in-flight.
  2. Return as many messages from the same message group as possible.
  3. If a message batch is still not full, go back to rule 1.

To see this behavior, add a second consumer and insert many more messages into the queue. For simplicity, the delete message action has been omitted in these diagrams but it is assumed that all messages in a batch are processed successfully by the consumer and the batch is properly deleted immediately after.

In this example, there are 11 Group A and 11 Group B transactions arriving in interleaved order and a second consumer has been added. Consumer 1 asks for a group of 10 messages and receives 10 Group A messages. Consumer 2 then asks for 10 messages but SQS knows that Group A is in flight, so it releases 10 Group B messages. The two consumers are now processing two batches of messages in parallel, speeding up throughput and then deleting their batches. When Consumer 1 requests the next batch of messages, it receives the remaining two messages, one from Group A and one from Group B.

Consider this nuanced detail from the example above. What would happen if Consumer 1 was on a faster server and processed its first batch of messages before Consumer 2 could mark its messages for deletion? See if you can predict the behavior before looking at the answer.

If Consumer 2 has not deleted its Group B messages yet when Consumer 1 asks for the next batch, then the FIFO queue considers Group B to still be in flight. It does not release any more Group B messages. Consumer 1 gets only the remaining Group A message. Later, after Consumer 2 has deleted its first batch, the remaining Group B message is released.

Conclusion

I hope this post answered your questions about how Amazon SQS FIFO queues work and why message groups are helpful. If you’re interested in exploring SQS FIFO queues further, here are a few ideas to get you started:

Message Filtering Operators for Numeric Matching, Prefix Matching, and Blacklisting in Amazon SNS

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/message-filtering-operators-for-numeric-matching-prefix-matching-and-blacklisting-in-amazon-sns/

This blog was contributed by Otavio Ferreira, Software Development Manager for Amazon SNS

Message filtering simplifies the overall pub/sub messaging architecture by offloading message filtering logic from subscribers, as well as message routing logic from publishers. The initial launch of message filtering provided a basic operator that was based on exact string comparison. For more information, see Simplify Your Pub/Sub Messaging with Amazon SNS Message Filtering.

Today, AWS is announcing an additional set of filtering operators that bring even more power and flexibility to your pub/sub messaging use cases.

Message filtering operators

Amazon SNS now supports both numeric and string matching. Specifically, string matching operators allow for exact, prefix, and “anything-but” comparisons, while numeric matching operators allow for exact and range comparisons, as outlined below. Numeric matching operators work for values between -10e9 and +10e9 inclusive, with five digits of accuracy right of the decimal point.

  • Exact matching on string values (Whitelisting): Subscription filter policy   {"sport": ["rugby"]} matches message attribute {"sport": "rugby"} only.
  • Anything-but matching on string values (Blacklisting): Subscription filter policy {"sport": [{"anything-but": "rugby"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"} and {"sport": "football"} but not {"sport": "rugby"}
  • Prefix matching on string values: Subscription filter policy {"sport": [{"prefix": "bas"}]} matches message attributes such as {"sport": "baseball"} and {"sport": "basketball"}
  • Exact matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["=", 301.5]}]} matches message attributes {"balance": 301.500} and {"balance": 3.015e2}
  • Range matching on numeric values: Subscription filter policy {"balance": [{"numeric": ["<", 0]}]} matches negative numbers only, and {"balance": [{"numeric": [">", 0, "<=", 150]}]} matches any positive number up to 150.

As usual, you may apply the “AND” logic by appending multiple keys in the subscription filter policy, and the “OR” logic by appending multiple values for the same key, as follows:

  • AND logic: Subscription filter policy {"sport": ["rugby"], "language": ["English"]} matches only messages that carry both attributes {"sport": "rugby"} and {"language": "English"}
  • OR logic: Subscription filter policy {"sport": ["rugby", "football"]} matches messages that carry either the attribute {"sport": "rugby"} or {"sport": "football"}

Message filtering operators in action

Here’s how this new set of filtering operators works. The following example is based on a pharmaceutical company that develops, produces, and markets a variety of prescription drugs, with research labs located in Asia Pacific and Europe. The company built an internal procurement system to manage the purchasing of lab supplies (for example, chemicals and utensils), office supplies (for example, paper, folders, and markers) and tech supplies (for example, laptops, monitors, and printers) from global suppliers.

This distributed system is composed of the four following subsystems:

  • A requisition system that presents the catalog of products from suppliers, and takes orders from buyers
  • An approval system for orders targeted to Asia Pacific labs
  • Another approval system for orders targeted to European labs
  • A fulfillment system that integrates with shipping partners

As shown in the following diagram, the company leverages AWS messaging services to integrate these distributed systems.

  • Firstly, an SNS topic named “Orders” was created to take all orders placed by buyers on the requisition system.
  • Secondly, two Amazon SQS queues, named “Lab-Orders-AP” and “Lab-Orders-EU” (for Asia Pacific and Europe respectively), were created to backlog orders that are up for review on the approval systems.
  • Lastly, an SQS queue named “Common-Orders” was created to backlog orders that aren’t related to lab supplies, which can already be picked up by shipping partners on the fulfillment system.

The company also uses AWS Lambda functions to automatically process lab supply orders that don’t require approval or which are invalid.

In this example, because different types of orders have been published to the SNS topic, the subscribing endpoints have had to set advanced filter policies on their SNS subscriptions, to have SNS automatically filter out orders they can’t deal with.

As depicted in the above diagram, the following five filter policies have been created:

  • The SNS subscription that points to the SQS queue “Lab-Orders-AP” sets a filter policy that matches lab supply orders, with a total value greater than $1,000, and that target Asia Pacific labs only. These more expensive transactions require an approver to review orders placed by buyers.
  • The SNS subscription that points to the SQS queue “Lab-Orders-EU” sets a filter policy that matches lab supply orders, also with a total value greater than $1,000, but that target European labs instead.
  • The SNS subscription that points to the Lambda function “Lab-Preapproved” sets a filter policy that only matches lab supply orders that aren’t as expensive, up to $1,000, regardless of their target lab location. These orders simply don’t require approval and can be automatically processed.
  • The SNS subscription that points to the Lambda function “Lab-Cancelled” sets a filter policy that only matches lab supply orders with total value of $0 (zero), regardless of their target lab location. These orders carry no actual items, obviously need neither approval nor fulfillment, and as such can be automatically canceled.
  • The SNS subscription that points to the SQS queue “Common-Orders” sets a filter policy that blacklists lab supply orders. Hence, this policy matches only office and tech supply orders, which have a more streamlined fulfillment process, and require no approval, regardless of price or target location.

After the company finished building this advanced pub/sub architecture, they were then able to launch their internal procurement system and allow buyers to begin placing orders. The diagram above shows six example orders published to the SNS topic. Each order contains message attributes that describe the order, and cause them to be filtered in a different manner, as follows:

  • Message #1 is a lab supply order, with a total value of $15,700 and targeting a research lab in Singapore. Because the value is greater than $1,000, and the location “Asia-Pacific-Southeast” matches the prefix “Asia-Pacific-“, this message matches the first SNS subscription and is delivered to SQS queue “Lab-Orders-AP”.
  • Message #2 is a lab supply order, with a total value of $1,833 and targeting a research lab in Ireland. Because the value is greater than $1,000, and the location “Europe-West” matches the prefix “Europe-“, this message matches the second SNS subscription and is delivered to SQS queue “Lab-Orders-EU”.
  • Message #3 is a lab supply order, with a total value of $415. Because the value is greater than $0 and less than $1,000, this message matches the third SNS subscription and is delivered to Lambda function “Lab-Preapproved”.
  • Message #4 is a lab supply order, but with a total value of $0. Therefore, it only matches the fourth SNS subscription, and is delivered to Lambda function “Lab-Cancelled”.
  • Messages #5 and #6 aren’t lab supply orders actually; one is an office supply order, and the other is a tech supply order. Therefore, they only match the fifth SNS subscription, and are both delivered to SQS queue “Common-Orders”.

Although each message only matched a single subscription, each was tested against the filter policy of every subscription in the topic. Hence, depending on which attributes are set on the incoming message, the message might actually match multiple subscriptions, and multiple deliveries will take place. Also, it is important to bear in mind that subscriptions with no filter policies catch every single message published to the topic, as a blank filter policy equates to a catch-all behavior.

Summary

Amazon SNS allows for both string and numeric filtering operators. As explained in this post, string operators allow for exact, prefix, and “anything-but” comparisons, while numeric operators allow for exact and range comparisons. These advanced filtering operators bring even more power and flexibility to your pub/sub messaging functionality and also allow you to simplify your architecture further by removing even more logic from your subscribers.

Message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). SNS filtering operators for numeric matching, prefix matching, and blacklisting are available now in all AWS Regions, for no extra charge.

To experiment with these new filtering operators yourself, and continue learning, try the 10-minute Tutorial Filter Messages Published to Topics. For more information, see Filtering Messages with Amazon SNS in the SNS documentation.

Scale Your Web Application — One Step at a Time

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/architecture/scale-your-web-application-one-step-at-a-time/

I often encounter people experiencing frustration as they attempt to scale their e-commerce or WordPress site—particularly around the cost and complexity related to scaling. When I talk to customers about their scaling plans, they often mention phrases such as horizontal scaling and microservices, but usually people aren’t sure about how to dive in and effectively scale their sites.

Now let’s talk about different scaling options. For instance if your current workload is in a traditional data center, you can leverage the cloud for your on-premises solution. This way you can scale to achieve greater efficiency with less cost. It’s not necessary to set up a whole powerhouse to light a few bulbs. If your workload is already in the cloud, you can use one of the available out-of-the-box options.

Designing your API in microservices and adding horizontal scaling might seem like the best choice, unless your web application is already running in an on-premises environment and you’ll need to quickly scale it because of unexpected large spikes in web traffic.

So how to handle this situation? Take things one step at a time when scaling and you may find horizontal scaling isn’t the right choice, after all.

For example, assume you have a tech news website where you did an early-look review of an upcoming—and highly-anticipated—smartphone launch, which went viral. The review, a blog post on your website, includes both video and pictures. Comments are enabled for the post and readers can also rate it. For example, if your website is hosted on a traditional Linux with a LAMP stack, you may find yourself with immediate scaling problems.

Let’s get more details on the current scenario and dig out more:

  • Where are images and videos stored?
  • How many read/write requests are received per second? Per minute?
  • What is the level of security required?
  • Are these synchronous or asynchronous requests?

We’ll also want to consider the following if your website has a transactional load like e-commerce or banking:

How is the website handling sessions?

  • Do you have any compliance requests—like the Payment Card Industry Data Security Standard (PCI DSS compliance) —if your website is using its own payment gateway?
  • How are you recording customer behavior data and fulfilling your analytics needs?
  • What are your loading balancing considerations (scaling, caching, session maintenance, etc.)?

So, if we take this one step at a time:

Step 1: Ease server load. We need to quickly handle spikes in traffic, generated by activity on the blog post, so let’s reduce server load by moving image and video to some third -party content delivery network (CDN). AWS provides Amazon CloudFront as a CDN solution, which is highly scalable with built-in security to verify origin access identity and handle any DDoS attacks. CloudFront can direct traffic to your on-premises or cloud-hosted server with its 113 Points of Presence (102 Edge Locations and 11 Regional Edge Caches) in 56 cities across 24 countries, which provides efficient caching.
Step 2: Reduce read load by adding more read replicas. MySQL provides a nice mirror replication for databases. Oracle has its own Oracle plug for replication and AWS RDS provide up to five read replicas, which can span across the region and even the Amazon database Amazon Aurora can have 15 read replicas with Amazon Aurora autoscaling support. If a workload is highly variable, you should consider Amazon Aurora Serverless database  to achieve high efficiency and reduced cost. While most mirror technologies do asynchronous replication, AWS RDS can provide synchronous multi-AZ replication, which is good for disaster recovery but not for scalability. Asynchronous replication to mirror instance means replication data can sometimes be stale if network bandwidth is low, so you need to plan and design your application accordingly.

I recommend that you always use a read replica for any reporting needs and try to move non-critical GET services to read replica and reduce the load on the master database. In this case, loading comments associated with a blog can be fetched from a read replica—as it can handle some delay—in case there is any issue with asynchronous reflection.

Step 3: Reduce write requests. This can be achieved by introducing queue to process the asynchronous message. Amazon Simple Queue Service (Amazon SQS) is a highly-scalable queue, which can handle any kind of work-message load. You can process data, like rating and review; or calculate Deal Quality Score (DQS) using batch processing via an SQS queue. If your workload is in AWS, I recommend using a job-observer pattern by setting up Auto Scaling to automatically increase or decrease the number of batch servers, using the number of SQS messages, with Amazon CloudWatch, as the trigger.  For on-premises workloads, you can use SQS SDK to create an Amazon SQS queue that holds messages until they’re processed by your stack. Or you can use Amazon SNS  to fan out your message processing in parallel for different purposes like adding a watermark in an image, generating a thumbnail, etc.

Step 4: Introduce a more robust caching engine. You can use Amazon Elastic Cache for Memcached or Redis to reduce write requests. Memcached and Redis have different use cases so if you can afford to lose and recover your cache from your database, use Memcached. If you are looking for more robust data persistence and complex data structure, use Redis. In AWS, these are managed services, which means AWS takes care of the workload for you and you can also deploy them in your on-premises instances or use a hybrid approach.

Step 5: Scale your server. If there are still issues, it’s time to scale your server.  For the greatest cost-effectiveness and unlimited scalability, I suggest always using horizontal scaling. However, use cases like database vertical scaling may be a better choice until you are good with sharding; or use Amazon Aurora Serverless for variable workloads. It will be wise to use Auto Scaling to manage your workload effectively for horizontal scaling. Also, to achieve that, you need to persist the session. Amazon DynamoDB can handle session persistence across instances.

If your server is on premises, consider creating a multisite architecture, which will help you achieve quick scalability as required and provide a good disaster recovery solution.  You can pick and choose individual services like Amazon Route 53, AWS CloudFormation, Amazon SQS, Amazon SNS, Amazon RDS, etc. depending on your needs.

Your multisite architecture will look like the following diagram:

In this architecture, you can run your regular workload on premises, and use your AWS workload as required for scalability and disaster recovery. Using Route 53, you can direct a precise percentage of users to an AWS workload.

If you decide to move all of your workloads to AWS, the recommended multi-AZ architecture would look like the following:

In this architecture, you are using a multi-AZ distributed workload for high availability. You can have a multi-region setup and use Route53 to distribute your workload between AWS Regions. CloudFront helps you to scale and distribute static content via an S3 bucket and DynamoDB, maintaining your application state so that Auto Scaling can apply horizontal scaling without loss of session data. At the database layer, RDS with multi-AZ standby provides high availability and read replica helps achieve scalability.

This is a high-level strategy to help you think through the scalability of your workload by using AWS even if your workload in on premises and not in the cloud…yet.

I highly recommend creating a hybrid, multisite model by placing your on-premises environment replica in the public cloud like AWS Cloud, and using Amazon Route53 DNS Service and Elastic Load Balancing to route traffic between on-premises and cloud environments. AWS now supports load balancing between AWS and on-premises environments to help you scale your cloud environment quickly, whenever required, and reduce it further by applying Amazon auto-scaling and placing a threshold on your on-premises traffic using Route 53.

Glenn’s Take on re:Invent 2017 Part 1

Post Syndicated from Glenn Gore original https://aws.amazon.com/blogs/architecture/glenns-take-on-reinvent-2017-part-1/

GREETINGS FROM LAS VEGAS

Glenn Gore here, Chief Architect for AWS. I’m in Las Vegas this week — with 43K others — for re:Invent 2017. We have a lot of exciting announcements this week. I’m going to post to the AWS Architecture blog each day with my take on what’s interesting about some of the announcements from a cloud architectural perspective.

Why not start at the beginning? At the Midnight Madness launch on Sunday night, we announced Amazon Sumerian, our platform for VR, AR, and mixed reality. The hype around VR/AR has existed for many years, though for me, it is a perfect example of how a working end-to-end solution often requires innovation from multiple sources. For AR/VR to be successful, we need many components to come together in a coherent manner to provide a great experience.

First, we need lightweight, high-definition goggles with motion tracking that are comfortable to wear. Second, we need to track movement of our body and hands in a 3-D space so that we can interact with virtual objects in the virtual world. Third, we need to build the virtual world itself and populate it with assets and define how the interactions will work and connect with various other systems.

There has been rapid development of the physical devices for AR/VR, ranging from iOS devices to Oculus Rift and HTC Vive, which provide excellent capabilities for the first and second components defined above. With the launch of Amazon Sumerian we are solving for the third area, which will help developers easily build their own virtual worlds and start experimenting and innovating with how to apply AR/VR in new ways.

Already, within 48 hours of Amazon Sumerian being announced, I have had multiple discussions with customers and partners around some cool use cases where VR can help in training simulations, remote-operator controls, or with new ideas around interacting with complex visual data sets, which starts bringing concepts straight out of sci-fi movies into the real (virtual) world. I am really excited to see how Sumerian will unlock the creative potential of developers and where this will lead.

Amazon MQ
I am a huge fan of distributed architectures where asynchronous messaging is the backbone of connecting the discrete components together. Amazon Simple Queue Service (Amazon SQS) is one of my favorite services due to its simplicity, scalability, performance, and the incredible flexibility of how you can use Amazon SQS in so many different ways to solve complex queuing scenarios.

While Amazon SQS is easy to use when building cloud-native applications on AWS, many of our customers running existing applications on-premises required support for different messaging protocols such as: Java Message Service (JMS), .Net Messaging Service (NMS), Advanced Message Queuing Protocol (AMQP), MQ Telemetry Transport (MQTT), Simple (or Streaming) Text Orientated Messaging Protocol (STOMP), and WebSockets. One of the most popular applications for on-premise message brokers is Apache ActiveMQ. With the release of Amazon MQ, you can now run Apache ActiveMQ on AWS as a managed service similar to what we did with Amazon ElastiCache back in 2012. For me, there are two compelling, major benefits that Amazon MQ provides:

  • Integrate existing applications with cloud-native applications without having to change a line of application code if using one of the supported messaging protocols. This removes one of the biggest blockers for integration between the old and the new.
  • Remove the complexity of configuring Multi-AZ resilient message broker services as Amazon MQ provides out-of-the-box redundancy by always storing messages redundantly across Availability Zones. Protection is provided against failure of a broker through to complete failure of an Availability Zone.

I believe that Amazon MQ is a major component in the tools required to help you migrate your existing applications to AWS. Having set up cross-data center Apache ActiveMQ clusters in the past myself and then testing to ensure they work as expected during critical failure scenarios, technical staff working on migrations to AWS benefit from the ease of deploying a fully redundant, managed Apache ActiveMQ cluster within minutes.

Who would have thought I would have been so excited to revisit Apache ActiveMQ in 2017 after using SQS for many, many years? Choice is a wonderful thing.

Amazon GuardDuty
Maintaining application and information security in the modern world is increasingly complex and is constantly evolving and changing as new threats emerge. This is due to the scale, variety, and distribution of services required in a competitive online world.

At Amazon, security is our number one priority. Thus, we are always looking at how we can increase security detection and protection while simplifying the implementation of advanced security practices for our customers. As a result, we released Amazon GuardDuty, which provides intelligent threat detection by using a combination of multiple information sources, transactional telemetry, and the application of machine learning models developed by AWS. One of the biggest benefits of Amazon GuardDuty that I appreciate is that enabling this service requires zero software, agents, sensors, or network choke points. which can all impact performance or reliability of the service you are trying to protect. Amazon GuardDuty works by monitoring your VPC flow logs, AWS CloudTrail events, DNS logs, as well as combing other sources of security threats that AWS is aggregating from our own internal and external sources.

The use of machine learning in Amazon GuardDuty allows it to identify changes in behavior, which could be suspicious and require additional investigation. Amazon GuardDuty works across all of your AWS accounts allowing for an aggregated analysis and ensuring centralized management of detected threats across accounts. This is important for our larger customers who can be running many hundreds of AWS accounts across their organization, as providing a single common threat detection of their organizational use of AWS is critical to ensuring they are protecting themselves.

Detection, though, is only the beginning of what Amazon GuardDuty enables. When a threat is identified in Amazon GuardDuty, you can configure remediation scripts or trigger Lambda functions where you have custom responses that enable you to start building automated responses to a variety of different common threats. Speed of response is required when a security incident may be taking place. For example, Amazon GuardDuty detects that an Amazon Elastic Compute Cloud (Amazon EC2) instance might be compromised due to traffic from a known set of malicious IP addresses. Upon detection of a compromised EC2 instance, we could apply an access control entry restricting outbound traffic for that instance, which stops loss of data until a security engineer can assess what has occurred.

Whether you are a customer running a single service in a single account, or a global customer with hundreds of accounts with thousands of applications, or a startup with hundreds of micro-services with hourly release cycle in a devops world, I recommend enabling Amazon GuardDuty. We have a 30-day free trial available for all new customers of this service. As it is a monitor of events, there is no change required to your architecture within AWS.

Stay tuned for tomorrow’s post on AWS Media Services and Amazon Neptune.

 

Glenn during the Tour du Mont Blanc