Tag Archives: Amazon Simple Queue Service (SQS)

Building a Serverless Streaming Pipeline to Deliver Reliable Messaging

Post Syndicated from Chris McPeek original https://aws.amazon.com/blogs/compute/building-a-serverless-streaming-pipeline-to-deliver-reliable-messaging/

This post is written by Jeff Harman, Senior Prototyping Architect, Vaibhav Shah, Senior Solutions Architect and Erik Olsen, Senior Technical Account Manager.

Many industries are required to provide audit trails for decision and transactional systems. AI assisted decision making requires monitoring the full inputs to the decision system in near real time to prevent fraud, detect model drift, and discrimination. Modern systems often use a much wider array of inputs for decision making, including images, unstructured text, historical values, and other large data elements. These large data elements pose a challenge to traditional audit systems that deal with relatively small text messages in structured formats. This blog shows the use of serverless technology to create a reliable, performant, traceable, and durable streaming pipeline for audit processing.

Overview

Consider the following four requirements to develop an architecture for audit record ingestion:

  1. Audit record size: Store and manage large payloads (256k – 6 MB in size) that may be heterogeneous, including text, binary data, and references to other storage systems.
  2. Audit traceability: The data stored has full traceability of the payload and external processes to monitor the process via subscription-based events.
  3. High Performance: The time required for blocking writes to the system is limited to the time it takes to transmit the audit record over the network.
  4. High data durability: Once the system sends a payload receipt, the payload is at very low risk of loss because of system failures.

The following diagram shows an architecture that meets these requirements and models the flow of the audit record through the system.

The primary source of latency is the time it takes for an audit record to be transmitted across the network. Applications sending audit records make an API call to an Amazon API Gateway endpoint. An AWS Lambda function receives the message and an Amazon ElastiCache for Redis cluster provides a low latency initial storage mechanism for the audit record. Once the data is stored in ElastiCache, the AWS Step Functions workflow then orchestrates the communication and persistence functions.

Subscribers receive four Amazon Simple Notification Service (Amazon SNS) notifications pertaining to arrival and storage of the audit record payload, storage of the audit record metadata, and audit record archive completion. Users can subscribe an Amazon Simple Queue Service (SQS) queue to the SNS topic and use fan out mechanisms to achieve high reliability.

  1. The Ingest Message Lambda function sends an initial receipt notification
  2. The Message Archive Handler Lambda function notifies on storage of the audit record from ElastiCache to Amazon Simple Storage Service (Amazon S3)
  3. The Message Metadata Handler Lambda function notifies on storage of the message metadata into Amazon DynamoDB
  4. The Final State Aggregation Lambda function notifies that the audit record has been archived.

Any failure by the three fundamental processing steps: Ingestion, Data Archive, and Metadata Archive triggers a message in an SQS Dead Letter Queue (DLQ) which contains the original request and an explanation of the failure reason. Any failure in the Ingest Message function invokes the Ingest Message Failure function, which stores the original parameters to the S3 Failed Message Storage bucket for later analysis.

The Step Functions workflow provides orchestration and parallel path execution for the system. The detailed workflow below shows the execution flow and notification actions. The transformer steps convert the internal data structures into the format required for consumers.

Data structures

There are types three events and messages managed by this system:

  1. Incoming message: This is the message the producer sends to an API Gateway endpoint.
  2. Internal message: This event contains the message metadata allowing subsequent systems to understand the originating message producer context.
  3. Notification message: Messages that allow downstream subscribers to act based on the message.

Solution walkthrough

The message producer calls the API Gateway endpoint, which enforces the security requirements defined by the business. In this implementation, API Gateway uses an API key for providing more robust security. API Gateway also creates a security header for consumption by the Ingest Message Lambda function. API Gateway can be configured to enforce message format standards, see Use request validation in API Gateway for more information.

The Ingest Message Lambda function generates a message ID that tracks the message payload throughout its lifecycle. Then it stores the full message in the ElastiCache for Redis cache. The Ingest Message Lambda function generates an internal message with all the elements necessary as described above. Finally, the Lambda function handler code starts the Step Functions workflow with the internal message payload.

If the Ingest Message Lambda function fails for any reason, the Lambda function invokes the Ingestion Failure Handler Lambda function. This Lambda function writes any recoverable incoming message data to an S3 bucket and sends a notification on the Ingest Message dead letter queue.

The Step Functions workflow then runs three processes in parallel.

  • The Step Functions workflow triggers the Message Archive Data Handler Lambda function to persist message data from the ElastiCache cache to an S3 bucket. Once stored, the Lambda function returns the S3 bucket reference and state information. There are two options to remove the internal message from the cache. Remove the message from cache immediately before sending the internal message and updating the ElastiCache cache flag or wait for the ElastiCache lifecycle to remove a stale message from cache. This solution waits for the ElastiCache lifecycle to remove the message.
  • The workflow triggers the Message Metadata Handler Lambda function to write all message metadata and security information to DynamoDB. The Lambda function replies with the DynamoDB reference information.
  • Finally, the Step Functions workflow sends a message to the SNS topic to inform subscribers that the message has arrived and the data persistence processes have started.

After each of the Lambda functions’ processes complete, the Lambda function sends a notification to the SNS notification topic to alert subscribers that each action is complete. When both Message Metadata and Message Archive Lambda functions are done, the Final Aggregation function makes a final update to the metadata in DynamoDB to include S3 reference information and to remove the ElastiCache Redis reference.

Deploying the solution

Prerequisites:

  1. AWS Serverless Application Model (AWS SAM) is installed (see Getting started with AWS SAM)
  2. AWS User/Credentials with appropriate permissions to run AWS CloudFormation templates in the target AWS account
  3. Python 3.8 – 3.10
  4. The AWS SDK for Python (Boto3) is installed
  5. The requests python library is installed

The source code for this implementation can be found at  https://github.com/aws-samples/blog-serverless-reliable-messaging

Installing the Solution:

  1. Clone the git repository to a local directory
  2. git clone https://github.com/aws-samples/blog-serverless-reliable-messaging.git
  3. Change into the directory that was created by the clone operation, usually blog_serverless_reliable_messaging
  4. Execute the command: sam build
  5. Execute the command: sam deploy –-guided. You are asked to supply the following parameters:
    1. Stack Name: Name given to this deployment (example: serverless-streaming)
    2. AWS Region: Where to deploy (example: us-east-1)
    3. ElasticacheInstanceClass: EC2 cache instance type to use with (example: cache.t3.small)
    4. ElasticReplicaCount: How many replicas should be used with ElastiCache (recommended minimum: 2)
    5. ProjectName: Used for naming resources in account (example: serverless-streaming)
    6. MultiAZ: True/False if multiple Availability Zones should be used (recommend: True)
    7. The default parameters can be selected for the remainder of questions

Testing:

Once you have deployed the stack, you can test it through the API gateway endpoint with the API key that is referenced in the deployment output. There are two methods for retrieving the API key either via the AWS console (from the link provided in the output – ApiKeyConsole) or via the AWS CLI (from the AWS CLI reference in the output – APIKeyCLI).

You can test directly in the Lambda service console by invoking the ingest message function.

A test message is available at the root of the project test_message.json for direct Lambda function testing of the Ingest function.

  1. In the console navigate to the Lambda service
  2. From the list of available functions, select the “<project name> -IngestMessageFunction-xxxxx” function
  3. Under the “Function overview” select the “Test” tab
  4. Enter an event name of your choosing
  5. Copy and paste the contents of test_message.json into the “Event JSON” box
  6. Click “Save” then after it has saved, click the “Test”
  7. If successful, you should see something similar to the below in the details:
    {
    "isBase64Encoded": false,
    "statusCode": 200,
    "headers": {
    "Access-Control-Allow-Headers": "Content-Type",
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "OPTIONS,POST"
    },
    "body": "{\"messageID\": \"XXXXXXXXXXXXXX\"}"
    }
  8. In the S3 bucket “<project name>-s3messagearchive-xxxxxx“, find the payload of the original json with a key based on the date and time of the script execution, e.g.: YEAR/MONTH/DAY/HOUR/MINUTE with a file name of the messageID
  9. In a DynamoDB table named metaDataTable, you should find a record with a messageID equal to the messageID from above that contains all of the metadata related to the payload

A python script is included with the code in the test_client folder

  1. Replace the <Your API key key here> and the <Your API Gateway URL here (IngestMessageApi)> values with the correct ones for your environment in the test_client.py file
  2. Execute the test script with Python 3.8 or higher with the requests package installed
    Example execution (from main directory of git clone):
    python3 -m pip install -r ./test_client/requirements.txt
    python3 ./test_client/test_client.py
  3. Successful output shows the messageID and the header JSON payload:
    {
    "messageID": " XXXXXXXXXXXXXX"
    }
  4. In the S3 bucket “<project name>-s3messagearchive-xxxxxx“, you should be able to find the payload of the original json with a key based on the date and time of the script execution, e.g.: YEAR/MONTH/DAY/HOUR/MINUTE with a file name of the messageID
  5. In a DynamoDB table named metaDataTable, you should find a record with a messageID equal to the messageID from above that contains all of the meta data related to the payload

Conclusion

This blog describes architectural patterns, messaging patterns, and data structures that support a highly reliable messaging system for large messages. The use of serverless services including Lambda functions, Step Functions, ElastiCache, DynamoDB, and S3 meet the requirements of modern audit systems to be scalable and reliable. The architecture shared in this blog post is suitable for a highly regulated environment to store and track messages that are larger than typical logging systems, records sized between 256k and 6MB. The architecture serves as a blueprint that can be extended and adapted to fit further serverless use cases.

For serverless learning resources, visit Serverless Land.

Top Architecture Blog Posts of 2023

Post Syndicated from Andrea Courtright original https://aws.amazon.com/blogs/architecture/top-architecture-blog-posts-of-2023/

2023 was a rollercoaster year in tech, and we at the AWS Architecture Blog feel so fortunate to have shared in the excitement. As we move into 2024 and all of the new technologies we could see, we want to take a moment to highlight the brightest stars from 2023.

As always, thanks to our readers and to the many talented and hardworking Solutions Architects and other contributors to our blog.

I give you our 2023 cream of the crop!

#10: Build a serverless retail solution for endless aisle on AWS

In this post, Sandeep and Shashank help retailers and their customers alike in this guided approach to finding inventory that doesn’t live on shelves.

Building endless aisle architecture for order processing

Figure 1. Building endless aisle architecture for order processing

Check it out!

#9: Optimizing data with automated intelligent document processing solutions

Who else dreads wading through large amounts of data in multiple formats? Just me? I didn’t think so. Using Amazon AI/ML and content-reading services, Deependra, Anirudha, Bhajandeep, and Senaka have created a solution that is scalable and cost-effective to help you extract the data you need and store it in a format that works for you.

AI-based intelligent document processing engine

Figure 2: AI-based intelligent document processing engine

Check it out!

#8: Disaster Recovery Solutions with AWS managed services, Part 3: Multi-Site Active/Passive

Disaster recovery posts are always popular, and this post by Brent and Dhruv is no exception. Their creative approach in part 3 of this series is most helpful for customers who have business-critical workloads with higher availability requirements.

Warm standby with managed services

Figure 3. Warm standby with managed services

Check it out!

#7: Simulating Kubernetes-workload AZ failures with AWS Fault Injection Simulator

Continuing with the theme of “when bad things happen,” we have Siva, Elamaran, and Re’s post about preparing for workload failures. If resiliency is a concern (and it really should be), the secret is test, test, TEST.

Architecture flow for Microservices to simulate a realistic failure scenario

Figure 4. Architecture flow for Microservices to simulate a realistic failure scenario

Check it out!

#6: Let’s Architect! Designing event-driven architectures

Luca, Laura, Vittorio, and Zamira weren’t content with their four top-10 spots last year – they’re back with some things you definitely need to know about event-driven architectures.

Let's Architect

Figure 5. Let’s Architect artwork

Check it out!

#5: Use a reusable ETL framework in your AWS lake house architecture

As your lake house increases in size and complexity, you could find yourself facing maintenance challenges, and Ashutosh and Prantik have a solution: frameworks! The reusable ETL template with AWS Glue templates might just save you a headache or three.

Reusable ETL framework architecture

Figure 6. Reusable ETL framework architecture

Check it out!

#4: Invoking asynchronous external APIs with AWS Step Functions

It’s possible that AWS’ menagerie of services doesn’t have everything you need to run your organization. (Possible, but not likely; we have a lot of amazing services.) If you are using third-party APIs, then Jorge, Hossam, and Shirisha’s architecture can help you maintain a secure, reliable, and cost-effective relationship among all involved.

Invoking Asynchronous External APIs architecture

Figure 7. Invoking Asynchronous External APIs architecture

Check it out!

#3: Announcing updates to the AWS Well-Architected Framework

The Well-Architected Framework continues to help AWS customers evaluate their architectures against its six pillars. They are constantly striving for improvement, and Haleh’s diligence in keeping us up to date has not gone unnoticed. Thank you, Haleh!

Well-Architected logo

Figure 8. Well-Architected logo

Check it out!

#2: Let’s Architect! Designing architectures for multi-tenancy

The practically award-winning Let’s Architect! series strikes again! This time, Luca, Laura, Vittorio, and Zamira were joined by Federica to discuss multi-tenancy and why that concept is so crucial for SaaS providers.

Let's Architect

Figure 9. Let’s Architect

Check it out!

And finally…

#1: Understand resiliency patterns and trade-offs to architect efficiently in the cloud

Haresh, Lewis, and Bonnie revamped this 2022 post into a masterpiece that completely stole our readers’ hearts and is among the top posts we’ve ever made!

Resilience patterns and trade-offs

Figure 10. Resilience patterns and trade-offs

Check it out!

Bonus! Three older special mentions

These three posts were published before 2023, but we think they deserve another round of applause because you, our readers, keep coming back to them.

Thanks again to everyone for their contributions during a wild year. We hope you’re looking forward to the rest of 2024 as much as we are!

Serverless ICYMI Q4 2023

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/serverless-icymi-q4-2023/

Welcome to the 24th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, check out what happened last quarter here.

2023 Q4 Calendar

2023 Q4 Calendar

ServerlessVideo

ServerlessVideo at re:Invent 2024

ServerlessVideo at re:Invent 2024

ServerlessVideo is a demo application built by the AWS Serverless Developer Advocacy team to stream live videos and also perform advanced post-video processing. It uses several AWS services including AWS Step Functions, Amazon EventBridge, AWS Lambda, Amazon ECS, and Amazon Bedrock in a serverless architecture that makes it fast, flexible, and cost-effective. Key features include an event-driven core with loosely coupled microservices that respond to events routed by EventBridge. Step Functions orchestrates using both Lambda and ECS for video processing to balance speed, scale, and cost. There is a flexible plugin-based architecture using Step Functions and EventBridge to integrate and manage multiple video processing workflows, which include GenAI.

ServerlessVideo allows broadcasters to stream video to thousands of viewers using Amazon IVS. When a broadcast ends, a Step Functions workflow triggers a set of configured plugins to process the video, generating transcriptions, validating content, and more. The application incorporates various microservices to support live streaming, on-demand playback, transcoding, transcription, and events. Learn more about the project and watch videos from reinvent 2023 at video.serverlessland.com.

AWS Lambda

AWS Lambda enabled outbound IPv6 connections from VPC-connected Lambda functions, providing virtually unlimited scale by removing IPv4 address constraints.

The AWS Lambda and AWS SAM teams also added support for sharing test events across teams using AWS SAM CLI to improve collaboration when testing locally.

AWS Lambda introduced integration with AWS Application Composer, allowing users to view and export Lambda function configuration details for infrastructure as code (IaC) workflows.

AWS added advanced logging controls enabling adjustable JSON-formatted logs, custom log levels, and configurable CloudWatch log destinations for easier debugging. AWS enabled monitoring of errors and timeouts occurring during initialization and restore phases in CloudWatch Logs as well, making troubleshooting easier.

For Kafka event sources, AWS enabled failed event destinations to prevent functions stalling on failing batches by rerouting events to SQS, SNS, or S3. AWS also enhanced Lambda auto scaling for Kafka event sources in November to reach maximum throughput faster, reducing latency for workloads prone to large bursts of messages.

AWS launched support for Python 3.12 and Java 21 Lambda runtimes, providing updated libraries, smaller deployment sizes, and better AWS service integration. AWS also introduced a simplified console workflow to automate complex network configuration when connecting functions to Amazon RDS and RDS Proxy.

Additionally in December, AWS enabled faster individual Lambda function scaling allowing each function to rapidly absorb traffic spikes by scaling up to 1000 concurrent executions every 10 seconds.

Amazon ECS and AWS Fargate

In Q4 of 2023, AWS introduced several new capabilities across its serverless container services including Amazon ECS, AWS Fargate, AWS App Runner, and more. These features help improve application resilience, security, developer experience, and migration to modern containerized architectures.

In October, Amazon ECS enhanced its task scheduling to start healthy replacement tasks before terminating unhealthy ones during traffic spikes. This prevents going under capacity due to premature shutdowns. Additionally, App Runner launched support for IPv6 traffic via dual-stack endpoints to remove the need for address translation.

In November, AWS Fargate enabled ECS tasks to selectively use SOCI lazy loading for only large container images in a task instead of requiring it for all images. Amazon ECS also added idempotency support for task launches to prevent duplicate instances on retries. Amazon GuardDuty expanded threat detection to Amazon ECS and Fargate workloads which users can easily enable.

Also in November, the open source Finch container tool for macOS became generally available. Finch allows developers to build, run, and publish Linux containers locally. A new website provides tutorials and resources to help developers get started.

Finally in December, AWS Migration Hub Orchestrator added new capabilities for replatforming applications to Amazon ECS using guided workflows. App Runner also improved integration with Route 53 domains to automatically configure required records when associating custom domains.

AWS Step Functions

In Q4 2023, AWS Step Functions announced the redrive capability for Standard Workflows. This feature allows failed workflow executions to be redriven from the point of failure, skipping unnecessary steps and reducing costs. The redrive functionality provides an efficient way to handle errors that require longer investigation or external actions before resuming the workflow.

Step Functions also launched support for HTTPS endpoints in AWS Step Functions, enabling easier integration with external APIs and SaaS applications without needing custom code. Developers can now connect to third-party HTTP services directly within workflows. Additionally, AWS released a new test state capability that allows testing individual workflow states before full deployment. This feature helps accelerate development by making it faster and simpler to validate data mappings and permissions configurations.

AWS announced optimized integrations between AWS Step Functions and Amazon Bedrock for orchestrating generative AI workloads. Two new API actions were added specifically for invoking Bedrock models and training jobs from workflows. These integrations simplify building prompt chaining and other techniques to create complex AI applications with foundation models.

Finally, the Step Functions Workflow Studio is now integrated in the AWS Application Composer. This unified builder allows developers to design workflows and define application resources across the full project lifecycle within a single interface.

Amazon EventBridge

Amazon EventBridge announced support for new partner integrations with Adobe and Stripe. These integrations enable routing events from the Adobe and Stripe platforms to over 20 AWS services. This makes it easier to build event-driven architectures to handle common use cases.

Amazon SNS

In Q4, Amazon SNS added native in-place message archiving for FIFO topics to improve event stream durability by allowing retention policies and selective replay of messages without provisioning separate resources. Additional message filtering operators were also introduced including suffix matching, case-insensitive equality checks, and OR logic for matching across properties to simplify routing logic implementation for publishers and subscribers. Finally, delivery status logging was enabled through AWS CloudFormation.

Amazon SQS

Amazon SQS has introduced several major new capabilities and updates. These improve visibility, throughput, and message handling for users. Specifically, Amazon SQS enabled AWS CloudTrail logging of key SQS APIs. This gives customers greater visibility into SQS activity. Additionally, SQS increased the throughput quota for the high throughput mode of FIFO queues. This was significantly increased in certain Regions. It also boosted throughput in Asia Pacific Regions. Furthermore, Amazon SQS added dead letter queue redrive support. This allows you to redrive messages that failed and were sent to a dead letter queue (DLQ).

Serverless at AWS re:Invent

Serverless videos from re:Invent

Serverless videos from re:Invent

Visit the Serverless Land YouTube channel to find a list of serverless and serverless container sessions from reinvent 2023. Hear from experts like Chris Munns and Julian Wood in their popular session, Best practices for serverless developers, or Nathan Peck and Jessica Deen in Deploying multi-tenant SaaS applications on Amazon ECS and AWS Fargate.

EDA Day Nashville

EDA Day Nashville

EDA Day Nashville

The AWS Serverless Developer Advocacy team hosted an event-driven architecture (EDA) day conference on October 26, 2022 in Nashville, Tennessee. This inaugural GOTO EDA day convened over 200 attendees ranging from prominent EDA community members to AWS speakers and product managers. Attendees engaged in 13 sessions, two workshops, and panels covering EDA adoption best practices. The event built upon 2022 content by incorporating additional topics like messaging, containers, and machine learning. It also created opportunities for students and underrepresented groups in tech to participate. The full-day conference facilitated education, inspiration, and thoughtful discussion around event-driven architectural patterns and services on AWS.

Videos from EDA Day are now available on the Serverless Land YouTube channel.

Serverless blog posts

October

November

December

Serverless container blog posts

October

November

December

Serverless Office Hours

Serverless office hours: Q4 videos

October

November

December

Containers from the Couch

Containers from the Couch

October

November

December

FooBar

FooBar

October

November

December

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

And finally, visit the Serverless Land and Containers on AWS websites for all your serverless and serverless container needs.

Announcing throughput increase and dead letter queue redrive support for Amazon SQS FIFO queues

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/announcing-throughput-increase-and-dead-letter-queue-redrive-support-for-amazon-sqs-fifo-queues/

With Amazon Simple Queue Service (Amazon SQS), you can send, store, and receive messages between software components at any volume. Today, Amazon SQS has introduced two new capabilities for first-in, first-out (FIFO) queues:

  • Maximum throughput has been increased up to 70,000 transactions per second (TPS) per API action in selected AWS Regions, supporting sending or receiving up to 700,000 messages per second with batching.
  • Dead letter queue (DLQ) redrive support to handle messages that are not consumed after a specific number of retries in a way similar to what was already available for standard queues.

Let’s take a more in-depth look at how these work in practice.

FIFO queues throughput increase up to 70K TPS
FIFO queues are designed for applications that require messages to be processed exactly once and in the order in which they are sent. While standard queues have an unlimited throughput, FIFO queues have an upper quota in the number of TPS per API action.

Standard and FIFO queues support batch actions that can send and receive up to 10 messages with a single API call (up to a maximum total payload of 256 KB). This means that a FIFO queue can process up to 10 times more messages per second than its maximum throughput.

At launch in 2016, FIFO queues supported up to 300 TPS per API action (3,000 messages per second with batching). This was enough for many use cases, but some customers asked for more throughput.

With high throughput mode launched in 2021, FIFO queues introduced a tenfold increase of the maximum throughput and could process up to 3,000 TPS per API action, depending on the Region. One year later, that quota was doubled to up to 6,000 TPS per API action.

This year, Amazon SQS has already increased FIFO queue throughput quota two times, to up to 9,000 TPS per API action in August and up to 18,000 TPS per API action in October (depending on the Region).

Today, the Amazon SQS team has been able to increase the FIFO queue throughput quota again, allowing you to process up to 70,000 TPS per API action (up to 700,000 messages per second with batching) in the US East (N. Virginia), US West (Oregon), and Europe (Ireland) Regions. This is more than two hundred times the maximum throughput at launch.

DLQ redrive support for FIFO queues
With Amazon SQS, messages that are not consumed after a specific number of retries can automatically be moved to a DLQ. There, messages can be analyzed to understand the reason why they have not been processed correctly. Sometimes there is a bug or a misconfiguration in the consumer application. Other times the messages contain invalid data from the source applications that needs to be fixed to allow the messages to be processed again.

Either way, you can define a plan to reprocess these messages. For example, you can fix the consumer application and redrive all messages to the source queue. Or you can create a dedicated queue where a custom application receives the messages, fixes their content, and then sends them to the source queue.

To simplify moving the messages back to the source queue or to a different queue, Amazon SQS allows you to create a redrive task. Redrive tasks are already available for standard queues. Starting today, you can also start a redrive task for FIFO queues.

Using the Amazon SQS console, I create a first queue (my-dlq.fifo) to be used as a DLQ. To redrive messages back to the source FIFO queue, the queue type must match, so this is also a FIFO queue.

Then, I create a source FIFO queue (my-source-queue.fifo) to handle messages as usual. When I create the source queue, I configure the first queue (my-dlq.fifo) as the DLQ and specify 3 as the Maximum receives condition under which messages are moved from the source queue to the DLQ.

Console screenshot.

When a message has been received by a consumer for more than the number of times specified by this condition, Amazon SQS moves the message to the DLQ. The original message ID is retained and can be used to uniquely track the message.

To test this setup, I use the console to send a message to the source queue. Then, I use the AWS Command Line Interface (AWS CLI) to receive the message multiple times without deleting it.

aws sqs receive-message --queue-url https://sqs.eu-west-1.amazonaws.com/123412341234/my-source-queue.fifo
{
    "Messages": [
        {
            "MessageId": "ef2f1c72-4bfe-4093-a451-03fe2dbd4d0f",
            "ReceiptHandle": "...",
            "MD5OfBody": "0f445a578fbcb0c06ca8aeb90a36fcfb",
            "Body": "My important message."
        }
    ]
}

To receive the same message more than once, I wait for the time specified in the queue visibility timeout to pass (30 seconds by default).

After the third time, the message is not in the source queue because it has been moved to the DLQ. When I try to receive messages from the source queue, the list is empty.

aws sqs receive-message --queue-url https://sqs.eu-west-1.amazonaws.com/123412341234/my-source-queue.fifo
{
    "Messages": []
}

To confirm that the message has been moved, I poll the DLQ to see if the message is there.

aws sqs receive-message --queue-url https://sqs.eu-west-1.amazonaws.com/123412341234/my-dlq.fifo  
{
    "Messages": [
        {
            "MessageId": "ef2f1c72-4bfe-4093-a451-03fe2dbd4d0f",
            "ReceiptHandle": "...",
            "MD5OfBody": "0f445a578fbcb0c06ca8aeb90a36fcfb",
            "Body": "My important message."
        }
    ]
}

Now that the message is in the DLQ, I can investigate why the message has not been processed (well, I know the reason this time) and decide whether to redrive messages from the DLQ using the Amazon SQS console or the new redrive API that was introduced a few months ago. For this example, I use the console. Back on the Amazon SQS console, I select the DLQ queue and choose Start DLQ redrive.

In Redrive configuration, I choose to redrive the messages to the source queue. Optionally, I can specify another FIFO queue as a custom destination. I use System optimized in Velocity control settings to redrive messages with the maximum number of messages per second optimized by Amazon SQS. Optionally, if there is a large number of messages in the DLQ, I can configure a custom maximum rate of messages per second to avoid overloading consumers.

Console screenshot.

Before starting the redrive task, I can use the Inspect messages section to poll and check messages. I already decided what to do, so I choose DLQ redrive to start the task. I have only one message to process, so the redrive task completes very quickly.

Console screenshot.

As expected, the message is back in the source queue and is ready to be processed again.

Console screenshot.

Things to know
Dead letter queue (DLQ) support for FIFO queues is available today in all AWS Regions where Amazon SQS is offered with the exception of GovCloud Regions and those based in China.

In the DLQ configuration, the maximum number of receives should be between 1 and 1,000.

There is no additional cost for using high throughput mode or a DLQ. Every Amazon SQS action counts as a request. A single request can send or receive from 1 to 10 messages, up to a maximum total payload of 256 KB. You pay based on the number of requests, and requests are priced differently between standard and FIFO queues.

As part of the AWS Free Tier, there is no cost for the first million requests per month for standard queues and for the first million requests per month for FIFO queues. For more information, see Amazon SQS pricing.

With these updates and the increased throughput, you can cover the vast majority of use cases with FIFO queues.

Use Amazon SQS FIFO queues to have high throughput, exactly-once processing, and first-in-first-out delivery.

Danilo

Introducing shared VPC support on Amazon MWAA

Post Syndicated from John Jackson original https://aws.amazon.com/blogs/big-data/introducing-shared-vpc-support-on-amazon-mwaa/

In this post, we demonstrate automating deployment of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) using customer-managed endpoints in a VPC, providing compatibility with shared, or otherwise restricted, VPCs.

Data scientists and engineers have made Apache Airflow a leading open source tool to create data pipelines due to its active open source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and extensive library of pre-built integrations. Amazon MWAA is a managed service for Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. For each Airflow environment, Amazon MWAA creates a single-tenant service VPC, which hosts the metadatabase that stores states and the web server that provides the user interface. Amazon MWAA further manages Airflow scheduler and worker instances in a customer-owned and managed VPC, in order to schedule and run tasks that interact with customer resources. Those Airflow containers in the customer VPC access resources in the service VPC via a VPC endpoint.

Many organizations choose to centrally manage their VPC using AWS Organizations, allowing a VPC in an owner account to be shared with resources in a different participant account. However, because creating a new route outside of a VPC is considered a privileged operation, participant accounts can’t create endpoints in owner VPCs. Furthermore, many customers don’t want to extend the security privileges required to create VPC endpoints to all users provisioning Amazon MWAA environments. In addition to VPC endpoints, customers also wish to restrict data egress via Amazon Simple Queue Service (Amazon SQS) queues, and Amazon SQS access is a requirement in the Amazon MWAA architecture.

Shared VPC support for Amazon MWAA adds the ability for you to manage your own endpoints within your VPCs, adding compatibility to shared and otherwise restricted VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by your Amazon MWAA environments. This post demonstrates how customer-managed endpoints work with Amazon MWAA and provides examples of how to automate the provisioning of those endpoints.

Solution overview

Shared VPC support for Amazon MWAA allows multiple AWS accounts to create their Airflow environments into shared, centrally managed VPCs. The account that owns the VPC (owner) shares the two private subnets required by Amazon MWAA with other accounts (participants) that belong to the same organization from AWS Organizations. After the subnets are shared, the participants can view, create, modify, and delete Amazon MWAA environments in the subnets shared with them.

When users specify the need for a shared, or otherwise policy-restricted, VPC during environment creation, Amazon MWAA will first create the service VPC resources, then enter a pending state for up to 72 hours, with an Amazon EventBridge notification of the change in state. This allows owners to create the required endpoints on behalf of participants based on endpoint service information from the Amazon MWAA console or API, or programmatically via an AWS Lambda function and EventBridge rule, as in the example in this post.

After those endpoints are created on the owner account, the endpoint service in the single-tenant Amazon MWAA VPC will detect the endpoint connection event and resume environment creation. Should there be an issue, you can cancel environment creation by deleting the environment during this pending state.

This feature also allows you to remove the create, modify, and delete VPCE privileges from the AWS Identity and Access Management (IAM) principal creating Amazon MWAA environments, even when not using a shared VPC, because that permission will instead be imposed on the IAM principal creating the endpoint (the Lambda function in our example). Furthermore, the Amazon MWAA environment will provide the SQS queue Amazon Resource Name (ARN) used by the Airflow Celery Executor to queue tasks (the Celery Executor Queue), allowing you to explicitly enter those resources into your network policy rather than having to provide a more open and generalized permission.

In this example, we create the VPC and Amazon MWAA environment in the same account. For shared VPCs across accounts, the EventBridge rule and Lambda function would exist in the owner account, and the Amazon MWAA environment would be created in the participant account. See Sending and receiving Amazon EventBridge events between AWS accounts for more information.

Prerequisites

You should have the following prerequisites:

  • An AWS account
  • An AWS user in that account, with permissions to create VPCs, VPC endpoints, and Amazon MWAA environments
  • An Amazon Simple Storage Service (Amazon S3) bucket in that account, with a folder called dags

Create the VPC

We begin by creating a restrictive VPC using an AWS CloudFormation template, in order to simulate creating the necessary VPC endpoint and modifying the SQS endpoint policy. If you want to use an existing VPC, you can proceed to the next section.

  1. On the AWS CloudFormation console, choose Create stack and choose With new resources (standard).
  2. Under Specify template, choose Upload a template file.
  3. Now we edit our CloudFormation template to restrict access to Amazon SQS. In cfn-vpc-private-bjs.yml, edit the SqsVpcEndoint section to appear as follows:
   SqsVpcEndoint:
     Type: AWS::EC2::VPCEndpoint
     Properties:
       ServiceName: !Sub "com.amazonaws.${AWS::Region}.sqs"
       VpcEndpointType: Interface
       VpcId: !Ref VPC
       PrivateDnsEnabled: true
       SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
       SecurityGroupIds:
        - !Ref SecurityGroup
       PolicyDocument:
        Statement:
         - Effect: Allow
           Principal: '*'
           Action: '*'
           Resource: []

This additional policy document entry prevents Amazon SQS egress to any resource not explicitly listed.

Now we can create our CloudFormation stack.

  1. On the AWS CloudFormation console, choose Create stack.
  2. Select Upload a template file.
  3. Choose Choose file.
  4. Browse to the file you modified.
  5. Choose Next.
  6. For Stack name, enter MWAA-Environment-VPC.
  7. Choose Next until you reach the review page.
  8. Choose Submit.

Create the Lambda function

We have two options for self-managing our endpoints: manual and automated. In this example, we create a Lambda function that responds to the Amazon MWAA EventBridge notification. You could also use the EventBridge notification to send an Amazon Simple Notification Service (Amazon SNS) message, such as an email, to someone with permission to create the VPC endpoint manually.

First, we create a Lambda function to respond to the EventBridge event that Amazon MWAA will emit.

  1. On the Lambda console, choose Create function.
  2. For Name, enter mwaa-create-lambda.
  3. For Runtime, choose Python 3.11.
  4. Choose Create function.
  5. For Code, in the Code source section, for lambda_function, enter the following code:
    import boto3
    import json
    import logging
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    def lambda_handler(event, context):
        if event['detail']['status']=="PENDING":
            detail=event['detail']
            name=detail['name']
            celeryExecutorQueue=detail['celeryExecutorQueue']
            subnetIds=detail['networkConfiguration']['subnetIds']
            securityGroupIds=detail['networkConfiguration']['securityGroupIds']
            databaseVpcEndpointService=detail['databaseVpcEndpointService']
    
            # MWAA does not need to store the VPC ID, but we can get it from the subnets
            client = boto3.client('ec2')
            response = client.describe_subnets(SubnetIds=subnetIds)
            logger.info(response['Subnets'][0]['VpcId'])  
            vpcId=response['Subnets'][0]['VpcId']
            logger.info("vpcId: " + vpcId)       
            
            webserverVpcEndpointService=None
            if detail['webserverAccessMode']=="PRIVATE_ONLY":
                webserverVpcEndpointService=event['detail']['webserverVpcEndpointService']
            
            response = client.describe_vpc_endpoints(
                VpcEndpointIds=[],
                Filters=[
                    {"Name": "vpc-id", "Values": [vpcId]},
                    {"Name": "service-name", "Values": ["*.sqs"]},
                    ],
                MaxResults=1000
            )
            sqsVpcEndpoint=None
            for r in response['VpcEndpoints']:
                if subnetIds[0] in r['SubnetIds'] or subnetIds[0] in r['SubnetIds']:
                    # We are filtering describe by service name, so this must be SQS
                    sqsVpcEndpoint=r
                    break
            
            if sqsVpcEndpoint:
                logger.info("Found SQS endpoint: " + sqsVpcEndpoint['VpcEndpointId'])
    
                logger.info(sqsVpcEndpoint)
                pd = json.loads(sqsVpcEndpoint['PolicyDocument'])
                for s in pd['Statement']:
                    if s['Effect']=='Allow':
                        resource = s['Resource']
                        logger.info(resource)
                        if '*' in resource:
                            logger.info("'*' already allowed")
                        elif celeryExecutorQueue in resource: 
                            logger.info("'"+celeryExecutorQueue+"' already allowed")                
                        else:
                            s['Resource'].append(celeryExecutorQueue)
                            logger.info("Updating SQS policy to " + str(pd))
            
                            client.modify_vpc_endpoint(
                                VpcEndpointId=sqsVpcEndpoint['VpcEndpointId'],
                                PolicyDocument=json.dumps(pd)
                                )
                        break
            
            # create MWAA database endpoint
            logger.info("creating endpoint to " + databaseVpcEndpointService)
            endpointName=name+"-database"
            response = client.create_vpc_endpoint(
                VpcEndpointType='Interface',
                VpcId=vpcId,
                ServiceName=databaseVpcEndpointService,
                SubnetIds=subnetIds,
                SecurityGroupIds=securityGroupIds,
                TagSpecifications=[
                    {
                        "ResourceType": "vpc-endpoint",
                        "Tags": [
                            {
                                "Key": "Name",
                                "Value": endpointName
                            },
                        ]
                    },
                ],           
            )
            logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
                
            # create MWAA web server endpoint (if private)
            if webserverVpcEndpointService:
                endpointName=name+"-webserver"
                logger.info("creating endpoint to " + webserverVpcEndpointService)
                response = client.create_vpc_endpoint(
                    VpcEndpointType='Interface',
                    VpcId=vpcId,
                    ServiceName=webserverVpcEndpointService,
                    SubnetIds=subnetIds,
                    SecurityGroupIds=securityGroupIds,
                    TagSpecifications=[
                        {
                            "ResourceType": "vpc-endpoint",
                            "Tags": [
                                {
                                    "Key": "Name",
                                    "Value": endpointName
                                },
                            ]
                        },
                    ],                  
                )
                logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
    
        return {
            'statusCode': 200,
            'body': json.dumps(event['detail']['status'])
        }

  6. Choose Deploy.
  7. On the Configuration tab of the Lambda function, in the General configuration section, choose Edit.
  8. For Timeout, increate to 5 minutes, 0 seconds.
  9. Choose Save.
  10. In the Permissions section, under Execution role, choose the role name to edit the permissions of this function.
  11. For Permission policies, choose the link under Policy name.
  12. Choose Edit and add a comma and the following statement:
    {
    		"Sid": "Statement1",
    		"Effect": "Allow",
    		"Action": 
    		[
    			"ec2:DescribeVpcEndpoints",
    			"ec2:CreateVpcEndpoint",
    			"ec2:ModifyVpcEndpoint",
                "ec2:DescribeSubnets",
    			"ec2:CreateTags"
    		],
    		"Resource": 
    		[
    			"*"
    		]
    }

The complete policy should look similar to the following:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "logs:CreateLogGroup",
			"Resource": "arn:aws:logs:us-east-1:112233445566:*"
		},
		{
			"Effect": "Allow",
			"Action": [
				"logs:CreateLogStream",
				"logs:PutLogEvents"
			],
			"Resource": [
				"arn:aws:logs:us-east-1:112233445566:log-group:/aws/lambda/mwaa-create-lambda:*"
			]
		},
		{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeVpcEndpoints",
				"ec2:CreateVpcEndpoint",
				"ec2:ModifyVpcEndpoint",
               	"ec2:DescribeSubnets",
				"ec2:CreateTags"
			],
			"Resource": [
				"*"
			]
		}
	]
}
  1. Choose Next until you reach the review page.
  2. Choose Save changes.

Create an EventBridge rule

Next, we configure EventBridge to send the Amazon MWAA notifications to our Lambda function.

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter mwaa-create.
  3. Select Rule with an event pattern.
  4. Choose Next.
  5. For Creation method, choose User pattern form.
  6. Choose Edit pattern.
  7. For Event pattern, enter the following:
    {
      "source": ["aws.airflow"],
      "detail-type": ["MWAA Environment Status Change"]
    }

  8. Choose Next.
  9. For Select a target, choose Lambda function.

You may also specify an SNS notification in order to receive a message when the environment state changes.

  1. For Function, choose mwaa-create-lambda.
  2. Choose Next until you reach the final section, then choose Create rule.

Create an Amazon MWAA environment

Finally, we create an Amazon MWAA environment with customer-managed endpoints.

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter a unique name for your environment.
  3. For Airflow version, choose the latest Airflow version.
  4. For S3 bucket, choose Browse S3 and choose your S3 bucket, or enter the Amazon S3 URI.
  5. For DAGs folder, choose Browse S3 and choose the dags/ folder in your S3 bucket, or enter the Amazon S3 URI.
  6. Choose Next.
  7. For Virtual Private Cloud, choose the VPC you created earlier.
  8. For Web server access, choose Public network (Internet accessible).
  9. For Security groups, deselect Create new security group.
  10. Choose the shared VPC security group created by the CloudFormation template.

Because the security groups of the AWS PrivateLink endpoints from the earlier step are self-referencing, you must choose the same security group for your Amazon MWAA environment.

  1. For Endpoint management, choose Customer managed endpoints.
  2. Keep the remaining settings as default and choose Next.
  3. Choose Create environment.

When your environment is available, you can access it via the Open Airflow UI link on the Amazon MWAA console.

Clean up

Cleaning up resources that are not actively being used reduces costs and is a best practice. If you don’t delete your resources, you can incur additional charges. To clean up your resources, complete the following steps:

  1. Delete your Amazon MWAA environment, EventBridge rule, and Lambda function.
  2. Delete the VPC endpoints created by the Lambda function.
  3. Delete any security groups created, if applicable.
  4. After the above resources have completed deletion, delete the CloudFormation stack to ensure that you have removed all of the remaining resources.

Summary

This post described how to automate environment creation with shared VPC support in Amazon MWAA. This gives you the ability to manage your own endpoints within your VPC, adding compatibility to shared, or otherwise restricted, VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by their Amazon MWAA environments. To learn more about Amazon MWAA, refer to the Amazon MWAA User Guide. For more posts about Amazon MWAA, visit the Amazon MWAA resources page.


About the author

John Jackson has over 25 years of software experience as a developer, systems architect, and product manager in both startups and large corporations and is the AWS Principal Product Manager responsible for Amazon MWAA.

New for Amazon SQS – Update the AWS SDK to reduce latency

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-sqs-update-the-aws-sdk-to-reduce-latency/

With Amazon SQS, you can send and receive messages between software components at any scale. It was one of the first AWS services I used and as a Solutions Architect, I helped many customers take advantage of asynchronous communications using message queues. In fact, Amazon SQS has been generally available since July 2006 and, under the hood, has always used the same wire protocol based on XML that we call AWS Query protocol.

Today, I am happy to announce that Amazon SQS now supports a JSON-based wire protocol for all APIs. The AWS JSON protocol avoids many of the shortcomings of AWS Query protocol.

AWS JSON is more efficient than the previous XML-based protocol and can reduce both latency and client-side CPU usage when sending and receiving SQS messages. For example, for a request that sends a simple “hello world” message, the response body size using the old AWS Query protocol is about 400 bytes. The content length of the same SendMessage response using the new AWS JSON protocol is less than 1/3 of the previous size.

Using the New JSON-Based Protocol with Amazon SQS
This is the nicest part of this launch! To benefit from the AWS JSON protocol, you just need to update the AWS SDK to the latest version. While building this new capability, the SQS team was careful so that no code changes are needed to use the new JSON-based wire protocol.

For example, we ran a benchmark using the AWS SDK for Java to compare the old and new wire protocols. We expect similar results from the other AWS SDKs. Based on AWS performance tests for a 5KB message payload, JSON protocol for Amazon SQS reduces end-to-end message processing latency by up to 23 percent and reduces application client side CPU and memory usage. These numbers depend on the actual implementation and can differ from what you’ll see for your own applications.

Availability and Pricing
Amazon SQS support for the new JSON protocol is available today in all AWS Regions where SQS is offered. All generally available AWS SDKs now support AWS JSON for SQS APIs. To get the benefits of this optimization, you just need to update the AWS SDK to the latest version available.

There is no change in pricing when using the AWS JSON protocol. For more information, see Amazon SQS pricing.

If you’re interested in more details on how AWS protocols work, you can have a look at Smithy, the language we use at AWS for defining services and SDKs.

After almost 17 years of being generally available, we’re improving and innovating around our oldest service. Just a few months back, we introduced server-side encryption with Amazon SQS-managed encryption keys (SSE-SQS) by default for newly created queues, increased the default quota for high throughput mode for FIFO queues, and announced support for attribute-based access control (ABAC) using queue tags. More recently, Amazon SQS added new APIs to manage dead-letter queue (DLQ) redrive programmatically. At AWS, it’s always day 1!

Update the AWS SDK to improve CPU and memory usage and reduce latency when using Amazon SQS.

Danilo

Introducing faster polling scale-up for AWS Lambda functions configured with Amazon SQS

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/introducing-faster-polling-scale-up-for-aws-lambda-functions-configured-with-amazon-sqs/

This post was written by Anton Aleksandrov, Principal Solutions Architect, and Tarun Rai Madan, Senior Product Manager.

Today, AWS is announcing that AWS Lambda supports up to five times faster polling scale-up rate for spiky Lambda workloads configured with Amazon Simple Queue Service (Amazon SQS) as an event source.

This feature enables customers building event-driven applications using Lambda and SQS to achieve more responsive scaling during a sudden burst of messages in their SQS queues, and reduces the need to duplicate Lambda functions or SQS queues to achieve faster message processing.

Overview

Customers building modern event-driven and messaging applications with AWS Lambda use the Amazon SQS as a fundamental building block for creating decoupled architectures. Amazon SQS is a fully managed message queueing service for microservices, distributed systems, and serverless applications. When a Lambda function subscribes to an SQS queue as an event source, Lambda polls the queue, retrieves the messages, and sends retrieved messages in batches to the function handler for processing. To consume messages efficiently, Lambda detects the increase in queue depth, and increases the number of poller processes to process the queued messages.

Up until today, the Lambda was adding up to 60 concurrent executions per minute for Lambda functions subscribed to SQS queues, scaling up to a maximum of 1,250 concurrent executions in approximately 20 minutes. However, customers tell us that some of the modern event-driven applications they build using Lambda and SQS are sensitive to sudden spikes in messages, which may cause noticeable delay in processing of messages for end users. In order to harness the power of Lambda for applications that experience a burst of messages in SQS queues, these customers needed Lambda message polling to scale up faster.

With today’s announcement, Lambda functions that subscribe to an SQS queue can scale up to five times faster for queues that see a spike in message backlog, adding up to 300 concurrent executions per minute, and scaling up to a maximum of 1,250 concurrent executions. This scaling improvement helps to use the simplicity of Lambda and SQS integration to build event-driven applications that scale faster during a surge of incoming messages, particularly for real-time systems. It also offers customers the benefit of faster processing during spikes of messages in SQS queues, while continuing to offer the flexibility to limit the maximum concurrent Lambda invocations per SQS event source.

Controlling the maximum concurrent Lambda invocations by SQS

The new improved scaling rates are automatically applied to all AWS accounts using Lambda and SQS as an event source. There is no explicit action that you must take, and there’s no additional cost. This scaling improvement helps customers to build more performant Lambda applications where they need faster SQS polling scale-up. To prevent potentially overloading the downstream dependencies, Lambda provides customers the control to set the maximum number of concurrent executions at a function level with reserved concurrency, and event source level with maximum concurrency.

The following diagram illustrates settings that you can use to control the flow rate of an SQS event-source. You use reserved concurrency to control function-level scaling, and maximum concurrency to control event source scaling.

Control the flow rate of an SQS event-source

Reserved concurrency is the maximum concurrency that you want to allocate to a function. When a function has reserved concurrency allocated, no other functions can use that concurrency.

AWS recommends using reserved concurrency when you want to ensure a function has enough concurrency to scale up. When an SQS event source is attempting to scale up concurrent Lambda invocations, but the function has already reached the threshold defined by the reserved concurrency, the Lambda service throttles further function invocations.

This may result in SQS event source attempting to scale down, reducing the number of concurrently processed messages. Depending on the queue configuration, the throttled messages are returned to the queue for retrying, expire based on the retention policy, or sent to a dead-letter queue (DLQ) or on-failure destination.

The maximum concurrency setting allows you to control concurrency at the event source level. It allows you to define the maximum number of concurrent invocations the event source attempts to send to the Lambda function. For scenarios where a single function has multiple SQS event sources configured, you can define maximum concurrency for each event source separately, providing more granular control. When trying to add rate control to SQS event sources, AWS recommends you start evaluating maximum concurrency control first, as it provides greater flexibility.

Reserved concurrency and maximum concurrency are complementary capabilities, and can be used together. Maximum concurrency can help to prevent overwhelming downstream systems and throttled invocations. Reserved concurrency helps to ensure available concurrency for the function.

Example scenario

Consider your business must process large volumes of documents from storage. Once every few hours, your business partners upload large volumes of documents to S3 buckets in your account.

For resiliency, you’ve designed your application to send a message to an SQS queue for each of the uploaded documents, so you can efficiently process them without accidentally skipping any. The documents are processed using a Lambda function, which takes around two seconds to process a single document.

Processing these documents is a CPU-intensive operation, so you decide to process a single document per invocation. You want to use the power of Lambda to fan out the parallel processing to as many concurrent execution environments as possible. You want the Lambda function to scale up rapidly to process those documents in parallel as fast as possible, and scale-down to zero once all documents are processed to save costs.

When a business partner uploads 200,000 documents, 200,000 messages are sent to the SQS queue. The Lambda function is configured with an SQS event source, and it starts consuming the messages from the queue.

This diagram shows the results of running the test scenario before the SQS event source scaling improvements. As expected, you can see that concurrent executions grow by 60 per minute. It takes approximately 16 minutes to scale up to 900 concurrent executions gradually and process all the messages in the queue.

Results of running the test scenario before the SQS event source scaling improvements

The following diagram shows the results of running the same test scenario after the SQS event source scaling improvements. The timeframe used for both charts is the same, but the performance on the second chart is better. Concurrent executions grow by 300 per minute. It only takes 4 minutes to scale up to 1,250 concurrent executions, and all the messages in the queue are processed in approximately 8 minutes.

The results of running the same test scenario after the SQS event source scaling improvements

Deploying this example

Use the example project to replicate this performance test in your own AWS account. Follow the instructions in README.md for provisioning the sample project in your AWS accounts using the AWS Cloud Development Kit (CDK).

This example project is configured to demonstrate a large-scale workload processing 200,000 messages. Running this sample project in your account may incur charges. See AWS Lambda pricing and Amazon SQS pricing.

Once deployed, use the application under the “sqs-cannon” directory to send 200,000 messages to the SQS queue (or reconfigure to any other number). It takes several minutes to populate the SQS queue with messages. After all messages are sent, enable the SQS event source, as described in the README.md, and monitor the charts in the provisioned CloudWatch dashboard.

The default concurrency quota for new AWS accounts is 1000. If you haven’t requested an increase in this quota, the number of concurrent executions is capped at this number. Use Service Quotas or contact your account team to request a concurrency increase.

Security best practices

Always use the least privileged permissions when granting your Lambda functions access to SQS queues. This reduces potential attack surface by ensuring that only specific functions have permissions to perform specific actions on specific queues. For example, in case your function only polls from the queue, grant it permission to read messages, but not to send new messages. A function execution role defines which actions your function is allowed to perform on other resources. A queue access policy defines the principals that can access this queue, and the actions that are allowed.

Use server-side encryption (SSE) to store sensitive data in encrypted SQS queues. With SSE, your messages are always stored in encrypted form, and SQS only decrypts them for sending to an authorized consumer. SSE protects the contents of messages in queues using SQS-managed encryption keys (SSE-SQS) or keys managed in the AWS Key Management Service (SSE-KMS).

Conclusion

The improved Lambda SQS event source polling scale-up capability enables up to five times faster scale-up performance for spiky event-driven workloads using SQS queues, at no additional cost. This improvement offers customers the benefit of faster processing during spikes of messages in SQS queues, while continuing to offer the flexibility to limit the maximum concurrent invokes by SQS as an event source.

For more serverless learning resources, visit Serverless Land.

AWS Weekly Roundup – re:Post Selections, SNS and SQS FIFO improvements, multi-VPC ENI attachments, and more – October 30, 2023

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-repost-selections-sns-and-sqs-fifo-improvements-multi-vpc-eni-attachments-and-more-october-30-2023/

It’s less than a month to AWS re:Invent, but interesting news doesn’t slow down in the meantime. This week is my turn to help keep you up to date!

Last week’s launches
Here are some of the launches that caught my attention last week:

AWS re:Post – With re:Post, you have access to a community of experts that helps you become even more successful on AWS. With Selections, community members can organize knowledge in an aggregated view to create learning paths or curated content sets.

Amazon SNS – First-in-First-out (FIFO) topics now support the option to store and replay messages without needing to provision a separate archival resource. This improves the durability of your event-driven applications and can help you recover from downstream failure scenarios. Find out more in this AWS Comput Blog post – Archiving and replaying messages with Amazon SNS FIFO. Also, you can now use custom data identifiers to protect not only common sensitive data (such as names, addresses, and credit card numbers) but also domain-specific sensitive data, such as your company’s employee IDs. You can find additional info on this feature in this AWS Security blog post – Mask and redact sensitive data published to Amazon SNS using managed and custom data identifiers.

Amazon SQS – With the increased throughput quota for FIFO high throughput mode, you can process up to 18,000 transactions per second, per API action. Note the throughput quota depends on the AWS Region.

Amazon OpenSearch Service – OpenSearch Serverless now supports automated time-based data deletion with new index lifecycle policies. To determine the best strategy to deliver accurate and low latency vector search queries, OpenSearch can now intelligently evaluate optimal filtering strategies, like pre-filtering with approximate nearest neighbor (ANN) or filtering with exact k-nearest neighbor (k-NN). Also, OpenSearch Service now supports Internet Protocol Version 6 (IPv6).

Amazon EC2 – With multi-VPC ENI attachments, you can launch an instance with a primary elastic network interface (ENI) in one virtual private cloud (VPC) and attach a secondary ENI from another VPC. This helps maintain network-level segregation, but still allows specific workloads (like centralized appliances and databases) to communicate between them.

AWS CodePipeline – With parameterized pipelines, you can dynamically pass input parameters to a pipeline execution. You can now start a pipeline execution when a specific git tag is applied to a commit in the source repository.

Amazon MemoryDB – Now supports Graviton3-based R7g nodes that deliver up to 28 percent increased throughput compared to R6g. These nodes also deliver higher networking bandwidth.

Other AWS news
Here are a few posts from some of the other AWS and cloud blogs that I follow:

Networking & Content Delivery Blog – Some of the technical management and hardware decisions we make when building AWS network infrastructure: A Continuous Improvement Model for Interconnects within AWS Data Centers

Interconnect monitoring service infrastructure diagram

DevOps Blog – To help enterprise customers understand how many of developers use CodeWhisperer, how often they use it, and how often they accept suggestions: Introducing Amazon CodeWhisperer Dashboard and CloudWatch Metrics

Front-End Web & Mobile Blog – How to restrict access to your GraphQL APIs to consumers within a private network: Architecture Patterns for AWS AppSync Private APIs

Architecture Blog – Another post in this super interesting series: Let’s Architect! Designing systems for stream data processing

A serverless streaming data pipeline using Amazon Kinesis and AWS Glue

From Community.AWS: Load Testing WordPress Amazon Lightsail Instances and Future-proof Your .NET Apps With Foundation Model Choice and Amazon Bedrock.

Don’t miss the latest AWS open source newsletter by my colleague Ricardo.

Upcoming AWS events
Check your calendars and sign up for these AWS events

AWS Community Days – Join a community-led conference run by AWS user group leaders in your region: Jaipur (November 4), Vadodara (November 4), Brasil (November 4), Central Asia (Kazakhstan, Uzbekistan, Kyrgyzstan, and Mongolia on November 17-18), and Guatemala (November 18).

AWS re:Invent (November 27 – December 1) – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Browse the session catalog and attendee guides and check out the highlights for generative AI.

Here you can browse all upcoming AWS-led in-person and virtual events and developer-focused events.

And that’s all from me for this week. On to the next one!

Danilo

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Serverless ICYMI Q3 2023

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/serverless-icymi-q3-2023/

Welcome to the 23rd edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, check out what happened last quarter here.

AWS announces the general availability of Amazon Bedrock

Amazon Web Services (AWS) unveils five generative artificial intelligence (AI) innovations to democratize generative AI applications. Amazon Bedrock, now generally available, enables experimentation with top foundation models (FMs) and allows customization with proprietary data.

It supports creating managed agents for complex tasks without code and ensures security and privacy. Amazon Titan Embeddings, another FM, is generally available for various language-related use cases. Meta’s Llama 2, coming soon, enhances dialogue scenarios.

The upcoming Amazon CodeWhisperer customization capability enables secure customization using private code bases. Generative BI authoring capabilities in Amazon QuickSight simplify visualization creation for business analysts.

AWS Lambda

AWS Lambda now detects and stops recursive loops in Lambda functions. AWS Lambda now detects and halts functions caught in recursive or infinite loops, guarding against unexpected costs. Lambda identifies recursive behavior, discontinuing requests after 16 invocations. The feature addresses pitfalls stemming from misconfiguration or coding bugs, introducing detailed error messaging, and allowing users to set maximum limits on retry intervals. Notifications about recursive occurrences are relayed through the AWS Health Dashboard, emails, and CloudWatch Alarms for streamlined troubleshooting. Lambda uses AWS X-Ray trace headers for invocation tracking, requiring supported AWS SDK versions.

AWS simplifies writing .NET 6 Lambda functions. The Lambda Annotations Framework for .NET. A new programming model makes the experience of writing Lambda functions in C# feel more natural for .NET developers by using C# source generator technology. This streamlines the development workflow for .NET developers, making it easier to create serverless applications using the latest version of the .NET framework.

AWS Lambda and Amazon EventBridge Pipes now support enhanced filtering. Additional filtering capabilities include the ability to match against characters at the end of a value (suffix filtering), ignore case sensitivity (equals-ignore-case), and have a single rule match if any conditions across multiple separate fields are true (OR matching).

AWS Lambda Functions powered by AWS Graviton2 are now available in 6 additional Regions. Graviton2 processors are known for their performance benefits, and this expansion provides users with more choices for running serverless workloads.

AWS Lambda adds support for Python 3.11 allowing developers to take advantage of the latest features and improvements in the Python programming language for their serverless functions.

AWS Step Functions

AWS Step Functions enhances Workflow Studio, focusing on an Advanced Starter Template and Code Mode for efficient AWS Step Functions workflow creation. Users benefit from streamlined design-to-code transitions, pasting Amazon States Language (ASL) definitions directly into Workflow Studio, speeding up adjustments. Enhanced workflow execution and configuration allow direct execution and setting adjustments within Workflow Studio, improving user experience.

AWS Step Functions launches enhanced error handling This update helps users to identify errors with precision and refine retry strategies. Step Functions now enables detailed error messages in Fail states and precise control over retry intervals. Use the new maximum limits and jitter functionality to ensure efficient and controlled retries, preventing service overload in recovery scenarios.

AWS Step Functions distributed map is now available in the AWS GovCloud (US) Regions. This release highlights the availability of the distributed map feature in Step Functions specifically tailored for the AWS GovCloud (US) Regions. The distributed map feature is a powerful capability for orchestrating parallel and distributed processing in serverless workflows.

AWS SAM

AWS SAM CLI announces local testing and debugging support on Terraform projects.

Developers can now use AWS SAM CLI to locally test and debug AWS Lambda functions and Amazon API Gateway defined in their Terraform projects. AWS SAM CLI reads infrastructure resource information from the Terraform application, allowing users to start Lambda functions and API Gateway endpoints locally in a Docker container.

This update enables faster development cycles for Terraform users, who can use AWS SAM CLI commands like `AWS SAM local start-api`, `sam local start-lambda`, and `sam local invoke`, along with `sam local generate` for generating mock test events.

Amazon EventBridge

Amazon EventBridge Scheduler adds schedule deletion after completion. This feature offers enhanced functionality by supporting the automatic deletion of schedules upon completion of their last invocation. It is applicable to various scheduling types, including one-time, cron, and rate schedules with an end date. Amazon EventBridge Scheduler, a centralized and highly scalable service, enables the creation, execution, and management of schedules.

With the ability to schedule millions of tasks invoking over 270 AWS services and 6,000 API operations. This update streamlines the process of managing completed schedules. The automatic deletion feature reduces the need for manual intervention or custom code, saving time and simplifying scalability for users leveraging EventBridge Scheduler.

Amazon EventBridge Pipes now available in three additional Regions. This update extends the availability of Amazon EventBridge Pipes, a powerful event-routing service, to three additional Regions.

Amazon EventBridge API Destinations is now available in additional Regions. Providing users with more options for building scalable and decoupled applications.

Amazon EventBridge Schema Registry and Schema Discovery now in additional Regions. This expansion allows you to discover and store event structure – or schema – in a shared, central location. You can download code bindings for those schemas for Java, Python, TypeScript, and Golang so it’s easier to use events as objects in your code.

Amazon SNS

To enhance message privacy and security, Amazon Simple Notification Service (SNS) implemented Message Data Protection, allowing users to de-identify outbound messages via redaction or masking. Amazon SNS FIFO topics now support message delivery to Amazon SQS Standard queues. This provides users with increased flexibility in managing message delivery and ordering.

Expanding its monitoring capabilities, Amazon SNS introduced Additional Usage Metrics in Amazon CloudWatch. This enhancement allows users to gain more comprehensive insights into the performance and utilization of their SNS resources. SNS extended its global SMS sending capabilities to Israel (Tel Aviv), providing users in that Region with additional options for SMS notifications. SNS also expanded its reach by supporting Mobile Push Notifications in twelve new AWS Regions. This expansion aligns with the growing demand for mobile notification capabilities, offering a broader coverage for users across diverse Regions.

Amazon SQS

Amazon Simple Queue Service (SQS) introduced a number of updates. Attribute-Based Access Control (ABAC) was implemented for scalable access permissions, while message data protection can now de-identify outbound messages via redaction or masking. SQS FIFO topics now support message delivery to Amazon SQS Standard queues, providing enhanced flexibility. Addressing throughput demands, SQS increased the quota for FIFO High Throughput mode. JSON protocol support was previewed, offering improved message format flexibility. These updates underscore SQS’s commitment to advanced security and flexibility.

Amazon API Gateway

Amazon API Gateway undergoes a console refresh, aligning with Cloudscape Design System guidelines. Notable enhancements include improved usability, sortable tables, enhanced API key management, and direct API deployment from the Resource view. The update introduces dark mode, accessibility improvements, and visual alignment with HTTP APIs and AWS Services.

GOTO EDA day Nashville 2023

Join GOTO EDA Day in Nashville on October 26 for insights on event-driven architectures. Learn from industry leaders at Music City Center with talks, panels, and Hands-On Labs. Limited tickets available.

Serverless blog posts

July 2023

July 5- Implementing AWS Lambda error handling patterns

July 6 – Implementing AWS Lambda error handling patterns

July 7 – Understanding AWS Lambda’s invoke throttling limits

July 10 – Detecting and stopping recursive loops in AWS Lambda functions

July 11 – Implementing patterns that exit early out of a parallel state in AWS Step Functions

July 26 – Migrating AWS Lambda functions from the Go1.x runtime to the custom runtime on Amazon Linux 2

July 27 – Python 3.11 runtime now available in AWS Lambda

August 2023

August 2 – Automatically delete schedules upon completion with Amazon EventBridge Scheduler

August 7 – Using response streaming with AWS Lambda Web Adapter to optimize performance

August 15 – Integrating IBM MQ with Amazon SQS and Amazon SNS using Apache Camel

August 15 – Implementing the transactional outbox pattern with Amazon EventBridge Pipes

August 23 – Protecting an AWS Lambda function URL with Amazon CloudFront and Lambda@Edge

August 29 – Enhancing file sharing using Amazon S3 and AWS Step Functions

August 31 – Enhancing Workflow Studio with new features for streamlined authoring

September 2023

September 5 – AWS SAM support for HashiCorp Terraform now generally available

September 14 – Building a secure webhook forwarder using an AWS Lambda extension and Tailscale

September 18 – Building resilient serverless applications using chaos engineering

September 19 – Implementing idempotent AWS Lambda functions with Powertools for AWS Lambda (TypeScript)

September 19 – Centralizing management of AWS Lambda layers across multiple AWS Accounts

September 26 – Architecting for scale with Amazon API Gateway private integrations

September 26 – Visually design your application with AWS Application Composer

Videos

Serverless Office Hours – Tues 10AM PT

July 2023

July 4 – Benchmarking Lambda cold starts

July 11 – Lambda testing: AWS SAM remote invoke

July 18 – Using DynamoDB global tables

July 25 – Serverless observability with SLIC-watch

August 2023

August 1 – Step Functions versions and aliases

August 8 – Deploying Lambda with EKS and Crossplane / Managing Lambda with Kubernetes

August 15 – Serverless caching with Momento

September 2023

September 5 – Run any web app on Lambda

September 12 – Building an API platform on AWS

September 19 – Idempotency: exactly once processing

September 26 – AWS Amplify Studio + GraphQL

FooBar Serverless YouTube channel

July 2023

July 27 – Generative AI and Serverless to create a new story everyday

August 2023

August 3Getting started with Data Streaming

August 10 – Amazon Kinesis Data Streams – Shards? Provisioned? On-demand? What does all this mean?

August 17 – Put and consume events with AWS Lambda, Amazon Kinesis Data Stream and Event Source Mapping

August 24 – Create powerful data pipelines with Amazon Kinesis and EventBridge Pipes

August 31 – New Step Functions versions and alias!

September 2023

September 7 – Amazon Kinesis Data Firehose – What is this service for?

September 14 – Kinesis Data Firehose with AWS CDK – Lambda transformations

September 21 – Advanced Event Source Mapping configuration | AWS Lambda and Amazon Kinesis Data Streams

September 28 – Data Streaming Patterns

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Building a serverless document chat with AWS Lambda and Amazon Bedrock

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/building-a-serverless-document-chat-with-aws-lambda-and-amazon-bedrock/

This post is written by Pascal Vogel, Solutions Architect, and Martin Sakowski, Senior Solutions Architect.

Large language models (LLMs) are proving to be highly effective at solving general-purpose tasks such as text generation, analysis and summarization, translation, and much more. Because they are trained on large datasets, they can use a broad generalist knowledge base. However, as training takes place offline and uses publicly available data, their ability to access specialized, private, and up-to-date knowledge is limited.

One way to improve LLM knowledge in a specific domain is fine-tuning them on domain-specific datasets. However, this is time and resource intensive, requires specialized knowledge, and may not be appropriate for some tasks. For example, fine-tuning won’t allow an LLM to access information with daily accuracy.

To address these shortcomings, Retrieval Augmented Generation (RAG) is proving to be an effective approach. With RAG, data external to the LLM is used to augment prompts by adding relevant retrieved data in the context. This allows for integrating disparate data sources and the complete separation of data sources from the machine learning model entirely.

Tools such as LangChain or LlamaIndex are gaining popularity because of their ability to flexibly integrate with a variety of data sources such as (vector) databases, search engines, and current public data sources.

In the context of LLMs, semantic search is an effective search approach, as it considers the context and intent of user-provided prompts as opposed to a traditional literal search. Semantic search relies on word embeddings, which represent words, sentences, or documents as vectors. Consequently, documents must be transformed into embeddings using an embedding model as the basis for semantic search. Because this embedding process only needs to happen when a document is first ingested or updated, it’s a great fit for event-driven compute with AWS Lambda.

This blog post presents a solution that allows you to ask natural language questions of any PDF document you upload. It combines the text generation and analysis capabilities of an LLM with a vector search on the document content. The solution uses serverless services such as AWS Lambda to run LangChain and Amazon DynamoDB for conversational memory.

Amazon Bedrock is used to provide serverless access to foundational models such as Amazon Titan and models developed by leading AI startups, such as AI21 Labs, Anthropic, and Cohere. See the GitHub repository for a full list of available LLMs and deployment instructions.

You learn how the solution works, what design choices were made, and how you can use it as a blueprint to build your own custom serverless solutions based on LangChain that go beyond prompting individual documents. The solution code and deployment instructions are available on GitHub.

Solution overview

Let’s look at how the solution works at a high level before diving deeper into specific elements and the AWS services used in the following sections. The following diagram provides a simplified view of the solution architecture and highlights key elements:

The process of interacting with the web application looks like this:

  1. A user uploads a PDF document into an Amazon Simple Storage Service (Amazon S3) bucket through a static web application frontend.
  2. This upload triggers a metadata extraction and document embedding process. The process converts the text in the document into vectors. The vectors are loaded into a vector index and stored in S3 for later use.
  3. When a user chats with a PDF document and sends a prompt to the backend, a Lambda function retrieves the index from S3 and searches for information related to the prompt.
  4. An LLM then uses the results of this vector search, previous messages in the conversation, and its general-purpose capabilities to formulate a response to the user.

As can be seen on the following screenshot, the web application deployed as part of the solution allows you to upload documents and list uploaded documents and their associated metadata, such as number of pages, file size, and upload date. The document status indicates if a document is successfully uploaded, is being processed, or is ready for a conversation.

Web application document list view

By clicking on one of the processed documents, you can access a chat interface, which allows you to send prompts to the backend. It is possible to have multiple independent conversations with each document with separate message history.

Web application chat view

Embedding documents

Solution architecture diagram excerpt: embedding documents

When a new document is uploaded to the S3 bucket, an S3 event notification triggers a Lambda function that extracts metadata, such as file size and number of pages, from the PDF file and stores it in a DynamoDB table. Once the extraction is complete, a message containing the document location is placed on an Amazon Simple Queue Service (Amazon SQS) queue. Another Lambda function polls this queue using Lambda event source mapping. Applying the decouple messaging pattern to the metadata extraction and document embedding functions ensures loose coupling and protects the more compute-intensive downstream embedding function.

The embedding function loads the PDF file from S3 and uses a text embedding model to generate a vector representation of the contained text. LangChain integrates with text embedding models for a variety of LLM providers. The resulting vector representation of the text is loaded into a FAISS index. FAISS is an open source vector store that can run inside the Lambda function memory using the faiss-cpu Python package. Finally, a dump of this FAISS index is stored in the S3 bucket besides the original PDF document.

Generating responses

Solution architecture diagram excerpt: generating responses

When a prompt for a specific document is submitted via the Amazon API Gateway REST API endpoint, it is proxied to a Lambda function that:

  1. Loads the FAISS index dump of the corresponding PDF file from S3 and into function memory.
  2. Performs a similarity search of the FAISS vector store based on the prompt.
  3. If available, retrieves a record of previous messages in the same conversation via the DynamoDBChatMessageHistory integration. This integration can store message history in DynamoDB. Each conversation is identified by a unique ID.
  4. Finally, a LangChain ConversationalRetrievalChain passes the combination of the prompt submitted by the user, the result of the vector search, and the message history to an LLM to generate a response.

Web application and file uploads

Solution architecture diagram excerpt: web application

A static web application serves as the frontend for this solution. It’s built with React, TypeScriptVite, and TailwindCSS and deployed via AWS Amplify Hosting, a fully managed CI/CD and hosting service for fast, secure, and reliable static and server-side rendered applications. To protect the application from unauthorized access, it integrates with an Amazon Cognito user pool. The API Gateway uses an Amazon Cognito authorizer to authenticate requests.

Users upload PDF files directly to the S3 bucket using S3 presigned URLs obtained via the REST API. Several Lambda functions implement API endpoints used to create, read, and update document metadata in a DynamoDB table.

Extending and adapting the solution

The solution provided serves as a blueprint that can be enhanced and extended to develop your own use cases based on LLMs. For example, you can extend the solution so that users can ask questions across multiple PDF documents or other types of data sources. LangChain makes it easy to load different types of data into vector stores, which you can then use for semantic search.

Once your use case involves searching across multiple documents, consider moving from loading vectors into memory with FAISS to a dedicated vector database. There are several options for vector databases on AWS. One serverless option is Amazon Aurora Serverless v2 with the pgvector extension for PostgreSQL. Alternatively, vector databases developed by AWS Partners such as Pinecone or MongoDB Atlas Vector Search can be integrated with LangChain. Besides vector search, LangChain also integrates with traditional external data sources, such as the enterprise search service Amazon Kendra, Amazon OpenSearch, and many other data sources.

The solution presented in this blog post uses similarity search to find information in the vector database that closely matches the user-supplied prompt. While this works well in the presented use case, you can also use other approaches, such as maximal marginal relevance, to find the most relevant information to provide to the LLM. When searching across many documents and receiving many results, techniques such as MapReduce can improve the quality of the LLM responses.

Depending on your use case, you may also want to select a different LLM to achieve an ideal balance between quality of results and cost. Amazon Bedrock is a fully managed service that makes foundational models (FMs) from leading AI startups and Amazon available via an API, so you can choose from a wide range of FMs to find the model that’s best suited for your use case. You can use models such as Amazon Titan, Jurassic-2 from AI21 Labs, or Anthropic Claude.

To further optimize the user experience of your generative AI application, consider streaming LLM responses to your frontend in real-time using Lambda response streaming and implementing real-time data updates using AWS AppSync subscriptions or Amazon API Gateway WebSocket APIs.

Conclusion

AWS serverless services make it easier to focus on building generative AI applications by providing automatic scaling, built-in high availability, and a pay-for-use billing model. Event-driven compute with AWS Lambda is a good fit for compute-intensive, on-demand tasks such as document embedding and flexible LLM orchestration.

The solution in this blog post combines the capabilities of LLMs and semantic search to answer natural language questions directed at PDF documents. It serves as a blueprint that can be extended and adapted to fit further generative AI use cases.

Deploy the solution by following the instructions in the associated GitHub repository.

For more serverless learning resources, visit Serverless Land.

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.

Outcomes

Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.

Conclusion

In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.


About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Implementing the transactional outbox pattern with Amazon EventBridge Pipes

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/implementing-the-transactional-outbox-pattern-with-amazon-eventbridge-pipes/

This post is written by Sayan Moitra, Associate Solutions Architect, and Sangram Sonawane, Senior Solutions Architect.

Microservice architecture is an architectural style that structures an application as a collection of loosely coupled and independently deployable services. Services must communicate with each other to exchange messages and perform business operations. Ensuring message reliability while maintaining loose coupling between services is crucial for building robust and scalable systems.

This blog demonstrates how to use Amazon DynamoDB, a fully managed serverless key-value NoSQL database, and Amazon EventBridge, a managed serverless event bus, to implement reliable messaging for microservices using the transactional outbox pattern.

Business operations can span across multiple systems or databases to maintain consistency and synchronization between them. One approach often used in distributed systems or architectures where data must be replicated across multiple locations or components is dual writes. In a dual write scenario, when a write operation is performed on one system or database, the same data or event also triggers another system in real-time or near real-time. This ensures that both systems always have the same data, minimizing data inconsistencies.

Dual writes can also introduce data integrity challenges in distributed systems. Failure to update the database or to send events to other downstream systems after an initial system update can lead to data loss and leave the application in an inconsistent state. One design approach to overcome this challenge is to combine dual writes with the transactional outbox pattern.

Challenges with dual writes

Consider an online food ordering application to illustrate the challenges with dual writes. Once the user submits the order, the order service updates the order status in a persistent data store. The order status update should also be sent to notify_restaurant and order_tracking services using a message bus for asynchronous communication. After successfully updating the order status in the database, the order service writes the event to the message bus. The order_service performs a dual write operation of updating the database and publishing the event details on the message bus for other services to read.

This approach works until there are issues encountered in publishing the event to the message bus. Publishing events can fail for multiple reasons like a network error or a message bus outage. When failure occurs, the notify_restaurant and order_tracking service will not be notified of the order update event, leaving the system in an inconsistent state. Implementing the transactional outbox pattern with dual writes can help ensure reliable messaging between systems after a database update.

This illustration shows a sequence diagram for an online food ordering application and the challenges with dual writes:

Sequence diagram

Overview of the transactional outbox pattern

In the transactional outbox pattern, a second persistent data store is introduced to store the outgoing messages. In the online food order example, updating the database with order details and storing the event information in the outbox table becomes a single atomic transaction.

The transaction is only successful when writing to both the database and the outbox table. Any failures to write to the outbox table rolls back the transaction. A separate process then reads the event from the outbox table and publishes the event on the message bus. Once the message is available on the message bus, it can be read by the notify_restaurant and order_tracking services. Combining transactional outbox pattern with dual writes allows for data consistency across systems and reliable message delivery with the transactional context.

The following illustration shows a sequence diagram for an online food ordering application with transactional outbox pattern for reliable message delivery.

Sequence diagram 2

Implementing the transaction outbox pattern

DynamoDB includes a feature called DynamoDB Streams to capture a time-ordered sequence of item-level modifications in the DynamoDB table and stores this information in a log for up to 24 hours. Applications can access this log and view the data items as they appeared before and after they were modified, in near real time.

Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attributes of the items that were modified. A stream record contains information about a data modification to a single item in a DynamoDB table. DynamoDB Streams writes stream records in near real time and these can be consumed for processing based on the contents. Enabling this feature removes the need to maintain a separate outbox table and lowers the management and operational overhead.

EventBridge Pipes connects event producers to consumers with options to transform, filter, and enrich messages. EventBridge Pipes can integrate with DynamoDB Streams to capture table events without writing any code. There is no need to write and maintain a separate process to read from the stream. EventBridge Pipes also supports retries, and any failed events can be routed to a dead-letter queue (DLQ) for further analysis and reprocessing.

EventBridge polls shards in DynamoDB stream for records and invokes pipes as soon as records are available. You can configure this to read records from DynamoDB only when it has gathered a specified batch size or the batch window expires. Pipes maintains the order of records from the data stream when sending that data to the destination. You can optionally filter or enhance these records before sending them to a target for processing.

Example overview

The following diagram illustrates the implementation of transactional outbox pattern with DynamoDB Streams and EventBridge Pipe. Amazon API Gateway is used to trigger a DynamoDB operation via a POST request. The change in the DynamoDB triggers an EventBridge event bus via Amazon EventBridge Pipes. This event bus invokes the Lambda functions through an SQS Queue, depending on the filters applied.

Architecture overview

  1. In this sample implementation, Amazon API Gateway makes a POST call to the DynamoDB table for database updates. Amazon API Gateway supports CRUD operations for Amazon DynamoDB without the need of a compute layer for database calls.
  2. DynamoDB Streams is enabled on the table, which captures a time-ordered sequence of item-level modifications in the DynamoDB table in near real time.
  3. EventBridge Pipes integrates with DynamoDB Streams to capture the events and can optionally filter and enrich the data before it is sent to a supported target. In this example, events are sent to Amazon EventBridge, which acts as a message bus. This can be replaced with any of the supported targets as detailed in Amazon EventBridge Pipes targets. DLQ can be configured to handle any failed events, which can be analyzed and retried.
  4. Consumers listening to the event bus receive messages. You can optionally fan out and deliver the events to multiple consumers and apply filters. You can configure a DLQ to handle any failures and retries.

Prerequisites

  1. AWS SAM CLI, version 1.85.0 or higher
  2. Python 3.10

Deploying the example application

  1. Clone the repository:
    git clone https://github.com/aws-samples/amazon-eventbridge-pipes-dynamodb-stream-transactional-outbox.git
  2. Change to the root directory of the project and run the following AWS SAM CLI commands:
    cd amazon-eventbridge-pipes-dynamodb-stream-transactional-outbox               
    sam build
    sam deploy --guided
    
  3. Enter the name for your stack during guided deployment. During the deploy process, select the default option for all the additional steps.
    SAM deployment
  4. The resources are deployed.
    Testing the application

Testing the application

Once the deployment is complete, it provides the API Gateway URL in the output. You can test using that URL. To test the application, use Postman to make a POST call to API Gateway prod URL:

Postman

You can also test using the curl command:

curl -s --header "Content-Type: application/json" \
  --request POST \
  --data '{"Status":"Created"}' \
  <API_ENDPOINT>

This produces the following output:

Expected output

To verify if the order details are updated in the DynamoDB table, run this command for performing a scan operation on the table.

aws dynamodb scan \
    --table-name <DynamoDB Table Name>

Handling failures

DynamoDB Streams captures a time-ordered sequence of item-level modifications in the DynamoDB table and stores this information in a log for up to 24 hours. If EventBridge is unavailable to read from DynamoDB Stream due to misconfiguration, for example, the records are available in the log for 24 hours. Once EventBridge is reintegrated, it retrieves all undelivered records from the last 24 hours. For integration issues between EventBridge Pipes and the target application, all failed messages can be sent to the DLQ for reprocessing at a later time.

Cleaning up

To clean up your AWS based resources, run following AWS SAM CLI command, answering “y” to all questions:

sam delete --stack-name <stack_name>

Conclusion

Reliable interservice communication is an important consideration in microservice design, especially when faced with dual writes. Combining the transactional outbox pattern with dual writes provides a robust way of improving message reliability.

This blog demonstrates an architecture pattern to tackle the challenge of dual writes by combining it with the transactional outbox pattern using DynamoDB and EventBridge Pipes. This solution provides a no-code approach with AWS Managed Services, reducing management and operational overhead.

For more serverless learning resources, visit Serverless Land.

Integrating IBM MQ with Amazon SQS and Amazon SNS using Apache Camel

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/integrating-ibm-mq-with-amazon-sqs-and-amazon-sns-using-apache-camel/

This post is written by Joaquin Rinaudo, Principal Security Consultant and Gezim Musliaj, DevOps Consultant.

IBM MQ is a message-oriented middleware (MOM) product used by many enterprise organizations, including global banks, airlines, and healthcare and insurance companies.

Customers often ask us for guidance on how they can integrate their existing on-premises MOM systems with new applications running in the cloud. They’re looking for a cost-effective, scalable and low-effort solution that enables them to send and receive messages from their cloud applications to these messaging systems.

This blog post shows how to set up a bi-directional bridge from on-premises IBM MQ to Amazon MQ, Amazon Simple Queue Service (Amazon SQS), and Amazon Simple Notification Service (Amazon SNS).

This allows your producer and consumer applications to integrate using fully managed AWS messaging services and Apache Camel. Learn how to deploy such a solution and how to test the running integration using SNS, SQS, and a demo IBM MQ cluster environment running on Amazon Elastic Container Service (ECS) with AWS Fargate.

This solution can also be used as part of a step-by-step migration using the approach described in the blog post Migrating from IBM MQ to Amazon MQ using a phased approach.

Solution overview

The integration consists of an Apache Camel broker cluster that bi-directionally integrates an IBM MQ system and target systems, such as Amazon MQ running ActiveMQ, SNS topics, or SQS queues.

In the following example, AWS services, in this case AWS Lambda and SQS, receive messages published to IBM MQ via an SNS topic:

Solution architecture overview for sending messages

  1. The cloud message consumers (Lambda and SQS) subscribe to the solution’s target SNS topic.
  2. The Apache Camel broker connects to IBM MQ using secrets stored in AWS Secrets Manager and reads new messages from the queue using IBM MQ’s Java library. Only IBM MQ messages are supported as a source.
  3. The Apache Camel broker publishes these new messages to the target SNS topic. It uses the Amazon SNS Extended Client Library for Java to store any messages larger than 256 KB in an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Apache Camel stores any message that cannot be delivered to SNS after two retries in an S3 dead letter queue bucket.

The next diagram demonstrates how the solution sends messages back from an SQS queue to IBM MQ:

Solution architecture overview for sending messages

  1. A sample message producer using Lambda sends messages to an SQS queue. It uses the Amazon SQS Extended Client Library for Java to send messages larger than 256 KB.
  2. The Apache Camel broker receives the messages published to SQS, using the SQS Extended Client Library if needed.
  3. The Apache Camel broker sends the message to the IBM MQ target queue.
  4. As before, the broker stores messages that cannot be delivered to IBM MQ in the S3 dead letter queue bucket.

A phased live migration consists of two steps:

  1. Deploy the broker service to allow reading messages from and writing to existing IBM MQ queues.
  2. Once the consumer or producer is migrated, migrate its counterpart to the newly selected service (SNS or SQS).

Next, you will learn how to set up the solution using the AWS Cloud Development Kit (AWS CDK).

Deploying the solution

Prerequisites

  • AWS CDK
  • TypeScript
  • Java
  • Docker
  • Git
  • Yarn

Step 1: Cloning the repository

Clone the repository using git:

git clone https://github.com/aws-samples/aws-ibm-mq-adapter

Step 2: Setting up test IBM MQ credentials

This demo uses IBM MQ’s mutual TLS authentication. To do this, you must generate X.509 certificates and store them in AWS Secrets Manager by running the following commands in the app folder:

  1. Generate X.509 certificates:
    ./deploy.sh generate_secrets
  2. Set up the secrets required for the Apache Camel broker (replace <integration-name> with, for example, dev):
    ./deploy.sh create_secrets broker <integration-name>
  3. Set up secrets for the mock IBM MQ system:
    ./deploy.sh create_secrets mock
  4. Update the cdk.json file with the secrets ARN output from the previous commands:
    • IBM_MOCK_PUBLIC_CERT_ARN
    • IBM_MOCK_PRIVATE_CERT_ARN
    • IBM_MOCK_CLIENT_PUBLIC_CERT_ARN
    • IBMMQ_TRUSTSTORE_ARN
    • IBMMQ_TRUSTSTORE_PASSWORD_ARN
    • IBMMQ_KEYSTORE_ARN
    • IBMMQ_KEYSTORE_PASSWORD_ARN

If you are using your own IBM MQ system and already have X.509 certificates available, you can use the script to upload those certificates to AWS Secrets Manager after running the script.

Step 3: Configuring the broker

The solution deploys two brokers, one to read messages from the test IBM MQ system and one to send messages back. A separate Apache Camel cluster is used per integration to support better use of Auto Scaling functionality and to avoid issues across different integration operations (consuming and reading messages).

Update the cdk.json file with the following values:

  • accountId: AWS account ID to deploy the solution to.
  • region: name of the AWS Region to deploy the solution to.
  • defaultVPCId: specify a VPC ID for an existing VPC in the AWS account where the broker and mock are deployed.
  • allowedPrincipals: add your account ARN (e.g., arn:aws:iam::123456789012:root) to allow this AWS account to send messages to and receive messages from the broker. You can use this parameter to set up cross-account relationships for both SQS and SNS integrations and support multiple consumers and producers.

Step 4: Bootstrapping and deploying the solution

  1. Make sure you have the correct AWS_PROFILE and AWS_REGION environment variables set for your development account.
  2. Run yarn cdk bootstrap –-qualifier mq <aws://<account-id>/<region> to bootstrap CDK.
  3. Run yarn install to install CDK dependencies.
  4. Finally, execute yarn cdk deploy '*-dev' –-qualifier mq --require-approval never to deploy the solution to the dev environment.

Step 5: Testing the integrations

Use AWS System Manager Session Manager and port forwarding to establish tunnels to the test IBM MQ instance to access the web console and send messages manually. For more information on port forwarding, see Amazon EC2 instance port forwarding with AWS System Manager.

  1. In a command line terminal, make sure you have the correct AWS_PROFILE and AWS_REGION environment variables set for your development account.
  2. In addition, set the following environment variables:
    • IBM_ENDPOINT: endpoint for IBM MQ. Example: network load balancer for IBM mock mqmoc-mqada-1234567890.elb.eu-west-1.amazonaws.com.
    • BASTION_ID: instance ID for the bastion host. You can retrieve this output from Step 4: Bootstrapping and deploying the solution listed after the mqBastionStack deployment.

    Use the following command to set the environment variables:

    export IBM_ENDPOINT=mqmoc-mqada-1234567890.elb.eu-west-1.amazonaws.com
    export BASTION_ID=i-0a1b2c3d4e5f67890
  3. Run the script test/connect.sh.
  4. Log in to the IBM web console via https://127.0.0.1:9443/admin using the default IBM user (admin) and the password stored in AWS Secrets Manager as mqAdapterIbmMockAdminPassword.

Sending data from IBM MQ and receiving it in SNS:

  1. In the IBM MQ console, access the local queue manager QM1 and DEV.QUEUE.1.
  2. Send a message with the content Hello AWS. This message will be processed by AWS Fargate and published to SNS.
  3. Access the SQS console and choose the snsIntegrationStack-dev-2 prefix queue. This is an SQS queue subscribed to the SNS topic for testing.
  4. Select Send and receive message.
  5. Select Poll for messages to see the Hello AWS message previously sent to IBM MQ.

Sending data back from Amazon SQS to IBM MQ:

  1. Access the SQS console and choose the queue with the prefix sqsPublishIntegrationStack-dev-3-dev.
  2. Select Send and receive messages.
  3. For Message Body, add Hello from AWS.
  4. Choose Send message.
  5. In the IBM MQ console, access the local queue manager QM1 and DEV.QUEUE.2 to find your message listed under this queue.

Step 6: Cleaning up

Run cdk destroy '*-dev' to destroy the resources deployed as part of this walkthrough.

Conclusion

In this blog, you learned how you can exchange messages between IBM MQ and your cloud applications using Amazon SQS and Amazon SNS.

If you’re interested in getting started with your own integration, follow the README file in the GitHub repository. If you’re migrating existing applications using industry-standard APIs and protocols such as JMS, NMS, or AMQP 1.0, consider integrating with Amazon MQ using the steps provided in the repository.

If you’re interested in running Apache Camel in Kubernetes, you can also adapt the architecture to use Apache Camel K instead.

For more serverless learning resources, visit Serverless Land.

Prime Day 2023 Powered by AWS – All the Numbers

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/prime-day-2023-powered-by-aws-all-the-numbers/

As part of my annual tradition to tell you about how AWS makes Prime Day possible, I am happy to be able to share some chart-topping metrics (check out my 2016, 2017, 2019, 2020, 2021, and 2022 posts for a look back).

This year I bought all kinds of stuff for my hobbies including a small drill press, filament for my 3D printer, and irrigation tools. I also bought some very nice Alphablock books for my grandkids. According to our official release, the first day of Prime Day was the single largest sales day ever on Amazon and for independent sellers, with more than 375 million items purchased.

Prime Day by the Numbers
As always, Prime Day was powered by AWS. Here are some of the most interesting and/or mind-blowing metrics:

Amazon Elastic Block Store (Amazon EBS) – The Amazon Prime Day event resulted in an incremental 163 petabytes of EBS storage capacity allocated – generating a peak of 15.35 trillion requests and 764 petabytes of data transfer per day. Compared to the previous year, Amazon increased the peak usage on EBS by only 7% Year-over-Year yet delivered +35% more traffic per day due to efficiency efforts including workload optimization using Amazon Elastic Compute Cloud (Amazon EC2) AWS Graviton-based instances. Here’s a visual comparison:

AWS CloudTrail – AWS CloudTrail processed over 830 billion events in support of Prime Day 2023.

Amazon DynamoDB – DynamoDB powers multiple high-traffic Amazon properties and systems including Alexa, the Amazon.com sites, and all Amazon fulfillment centers. Over the course of Prime Day, these sources made trillions of calls to the DynamoDB API. DynamoDB maintained high availability while delivering single-digit millisecond responses and peaking at 126 million requests per second.

Amazon Aurora – On Prime Day, 5,835 database instances running the PostgreSQL-compatible and MySQL-compatible editions of Amazon Aurora processed 318 billion transactions, stored 2,140 terabytes of data, and transferred 836 terabytes of data.

Amazon Simple Email Service (SES) – Amazon SES sent 56% more emails for Amazon.com during Prime Day 2023 vs. 2022, delivering 99.8% of those emails to customers.

Amazon CloudFront – Amazon CloudFront handled a peak load of over 500 million HTTP requests per minute, for a total of over 1 trillion HTTP requests during Prime Day.

Amazon SQS – During Prime Day, Amazon SQS set a new traffic record by processing 86 million messages per second at peak. This is 22% increase from Prime Day of 2022, where SQS supported 70.5M messages/sec.

Amazon Elastic Compute Cloud (EC2) – During Prime Day 2023, Amazon used tens of millions of normalized AWS Graviton-based Amazon EC2 instances, 2.7x more than in 2022, to power over 2,600 services. By using more Graviton-based instances, Amazon was able to get the compute capacity needed while using up to 60% less energy.

Amazon Pinpoint – Amazon Pinpoint sent tens of millions of SMS messages to customers during Prime Day 2023 with a delivery success rate of 98.3%.

Prepare to Scale
Every year I reiterate the same message: rigorous preparation is key to the success of Prime Day and our other large-scale events. If you are preparing for a similar chart-topping event of your own, I strongly recommend that you take advantage of AWS Infrastructure Event Management (IEM). As part of an IEM engagement, my colleagues will provide you with architectural and operational guidance that will help you to execute your event with confidence!

Jeff;

Improving medical imaging workflows with AWS HealthImaging and SageMaker

Post Syndicated from Sukhomoy Basak original https://aws.amazon.com/blogs/architecture/improving-medical-imaging-workflows-with-aws-healthimaging-and-sagemaker/

Medical imaging plays a critical role in patient diagnosis and treatment planning in healthcare. However, healthcare providers face several challenges when it comes to managing, storing, and analyzing medical images. The process can be time-consuming, error-prone, and costly.

There’s also a radiologist shortage across regions and healthcare systems, making the demand for this specialty increases due to an aging population, advances in imaging technology, and the growing importance of diagnostic imaging in healthcare.

As the demand for imaging studies continues to rise, the limited number of available radiologists results in delays in available appointments and timely diagnoses. And while technology enables healthcare delivery improvements for clinicians and patients, hospitals seek additional tools to solve their most pressing challenges, including:

  • Professional burnout due to an increasing demand for imaging and diagnostic services
  • Labor-intensive tasks, such as volume measurement or structural segmentation of images
  • Increasing expectations from patients expecting high-quality healthcare experiences that match retail and technology in terms of convenience, ease, and personalization

To improve clinician and patient experiences, run your picture archiving and communication system (PACS) with an artificial intelligence (AI)-enabled diagnostic imaging cloud solution to securely gain critical insights and improve access to care.

AI helps reduce the radiologist burndown rate through automation. For example, AI saves radiologist chest x-ray interpretation time. It is also a powerful tool to identify areas that need closer inspection, and helps capture secondary findings that weren’t initially identified. The advancement of interoperability and analytics gives radiologist a 360-degree, longitudinal view of patient health records to provide better healthcare at potentially lower costs.

AWS offers services to address these challenges. This blog post discusses AWS HealthImaging (AWS AHI) and Amazon SageMaker, and how they are used together to improve healthcare providers’ medical imaging workflows. This ultimately accelerates imaging diagnostics and increases radiology productivity. AWS AHI enables developers to deliver performance, security, and scale to cloud-native medical imaging applications. It allows ingestion of Digital Imaging and Communication in Medicine (DICOM) images. Amazon SageMaker provides end-to-end solution for AI and machine learning.

Let’s explore an example use case involving X-rays after an auto accident. In this diagnostic medical imaging workflow, a patient is in the emergency room. From there:

  • The patient undergoes an X-ray to check for fractures.
  • The scanned acquisition device images flow to the PACS system.
  • The radiologist reviews the information gathered from this procedure and authors the report.
  • The patient workflow continues as the reports are made available to the referring physician.

Next-generation imaging solutions and workflows

Healthcare providers can use AWS AHI and Amazon SageMaker together to enable next-generation imaging solutions and improve medical imaging workflows. The following architecture illustrates this example.

X-ray images are sent to AWS HealthImaging and an Amazon SageMaker endpoint extracts insights.

Figure 1: X-ray images are sent to AWS HealthImaging and an Amazon SageMaker endpoint extracts insights.

Let’s review the architecture and the key components:

1. Imaging Scanner: Captures the images from a patient’s body. Depending on the modality, this can be an X-ray detector; a series of detectors in a CT scanner; a magnetic field and radio frequency coils in an MRI scanner; or an ultrasound transducer. This example uses an X-ray device.

2. Amazon SQS message queue: Consumes event from S3 bucket and triggers an AWS Step Functions workflow orchestration.

3. AWS Step Functions runs the transform and import jobs to further process and import the images into AWS AHI data store instance.

4. The final diagnostic image—along with any relevant patient information and metadata—is stored in the AWS AHI datastore. This allows for efficient imaging date retrieval and management. It also enables medical imaging data access with sub-second image retrieval latencies at scale, powered by cloud-native APIs and applications from AWS partners.

5. Radiologists responsible for ground truth for ML images perform medical image annotations using Amazon SageMaker Ground Truth. They visualize and label DICOM images using a custom data labeling workflow—a fully managed data labeling service that supports built-in or custom data labeling workflows. They also leverage tools like 3D Slicer for interactive medical image annotations.

6. Data scientists build or leverage built-in deep learning models using the annotated images on Amazon SageMaker. SageMaker offers a range of deployment options that vary from low latency and high throughput to long-running inference jobs. These options include considerations for batch, real-time, or near real-time inference.

7. Healthcare providers use AWS AHI and Amazon SageMaker to run AI-assisted detection and interpretation workflow. This workflow is used to identify hard-to-see fractures, dislocations, or soft tissue injuries to allow surgeons and radiologist to be more confident in their treatment choices.

8. Finally, the image stored in AWS AHI is displayed on a monitor or other visual output device where it can be analyzed and interpreted by a radiologist or other medical professional.

  • The Open Health Imaging Foundation (OHIF) Viewer is an open source, web-based, medical imaging platform. It provides a core framework for building complex imaging applications.
  • Radical Imaging or Arterys are AWS partners that provide OHIF-based medical imaging viewer.

Each of these components plays a critical role in the overall performance and accuracy of the medical imaging system as well as ongoing research and development focused on improving diagnostic outcomes and patient care. AWS AHI uses efficient metadata encoding, lossless compression, and progressive resolution data access to provide industry leading performance for loading images. Efficient metadata encoding enables image viewers and AI algorithms to understand the contents of a DICOM study without having to load the image data.

Security

The AWS shared responsibility model applies to data protection in AWS AHI and Amazon SageMaker.

Amazon SageMaker is HIPAA-eligible and can operate with data containing Protected Health Information (PHI). Encryption of data in transit is provided by SSL/TLS and is used when communicating both with the front-end interface of Amazon SageMaker (to the Notebook) and whenever Amazon SageMaker interacts with any other AWS services.

AWS AHI is also HIPAA-eligible service and provides access control at the metadata level, ensuring that each user and application can only see the images and metadata fields that are required based upon their role. This prevents the proliferation of Patient PHI. All access to AWS AHI APIs is logged in detail in AWS CloudTrail.

Both of these services leverage AWS Key Management service (AWS KMS) to satisfy the requirement that PHI data is encrypted at rest.

Conclusion

In this post, we reviewed a common use case for early detection and treatment of conditions, resulting in better patient outcomes. We also covered an architecture that can transform the radiology field by leveraging the power of technology to improve accuracy, efficiency, and accessibility of medical imaging.

Further reading

Best practices for implementing event-driven architectures in your organization

Post Syndicated from Emanuele Levi original https://aws.amazon.com/blogs/architecture/best-practices-for-implementing-event-driven-architectures-in-your-organization/

Event-driven architectures (EDA) are made up of components that detect business actions and changes in state, and encode this information in event notifications. Event-driven patterns are becoming more widespread in modern architectures because:

  • they are the main invocation mechanism in serverless patterns.
  • they are the preferred pattern for decoupling microservices, where asynchronous communications and event persistence are paramount.
  • they are widely adopted as a loose-coupling mechanism between systems in different business domains, such as third-party or on-premises systems.

Event-driven patterns have the advantage of enabling team independence through the decoupling and decentralization of responsibilities. This decentralization trend in turn, permits companies to move with unprecedented agility, enhancing feature development velocity.

In this blog, we’ll explore the crucial components and architectural decisions you should consider when adopting event-driven patterns, and provide some guidance on organizational structures.

Division of responsibilities

The communications flow in EDA (see What is EDA?) is initiated by the occurrence of an event. Most production-grade event-driven implementations have three main components, as shown in Figure 1: producers, message brokers, and consumers.

Three main components of an event-driven architecture

Figure 1. Three main components of an event-driven architecture

Producers, message brokers, and consumers typically assume the following roles:

Producers

Producers are responsible for publishing the events as they happen. They are the owners of the event schema (data structure) and semantics (meaning of the fields, such as the meaning of the value of an enum field). As this is the only contract (coupling) between producers and the downstream components of the system, the schema and its semantics are crucial in EDA. Producers are responsible for implementing a change management process, which involves both non-breaking and breaking changes. With introduction of breaking changes, consumers are able to negotiate the migration process with producers.

Producers are “consumer agnostic”, as their boundary of responsibility ends when an event is published.

Message brokers

Message brokers are responsible for the durability of the events, and will keep an event available for consumption until it is successfully processed. Message brokers ensure that producers are able to publish events for consumers to consume, and they regulate access and permissions to publish and consume messages.

Message brokers are largely “events agnostic”, and do not generally access or interpret the event content. However, some systems provide a routing mechanism based on the event payload or metadata.

Consumers

Consumers are responsible for consuming events, and own the semantics of the effect of events. Consumers are usually bounded to one business context. This means the same event will have different effect semantics for different consumers. Crucial architectural choices when implementing a consumer involve the handling of unsuccessful message deliveries or duplicate messages. Depending on the business interpretation of the event, when recovering from failure a consumer might permit duplicate events, such as with an idempotent consumer pattern.

Crucially, consumers are “producer agnostic”, and their boundary of responsibility begins when an event is ready for consumption. This allows new consumers to onboard into the system without changing the producer contracts.

Team independence

In order to enforce the division of responsibilities, companies should organize their technical teams by ownership of producers, message brokers, and consumers. Although the ownership of producers and consumers is straightforward in an EDA implementation, the ownership of the message broker may not be. Different approaches can be taken to identify message broker ownership depending on your organizational structure.

Decentralized ownership

Ownership of the message broker in a decentralized ownership organizational structure

Figure 2. Ownership of the message broker in a decentralized ownership organizational structure

In a decentralized ownership organizational structure (see Figure 2), the teams producing events are responsible for managing their own message brokers and the durability and availability of the events for consumers.

The adoption of topic fanout patterns based on Amazon Simple Queue Service (SQS) and Amazon Simple Notification Service (SNS) (see Figure 3), can help companies implement a decentralized ownership pattern. A bus-based pattern using Amazon EventBridge can also be similarly utilized (see Figure 4).

Topic fanout pattern based on Amazon SQS and Amazon SNS

Figure 3. Topic fanout pattern based on Amazon SQS and Amazon SNS

Events bus pattern based on Amazon EventBridge

Figure 4. Events bus pattern based on Amazon EventBridge

The decentralized ownership approach has the advantage of promoting team independence, but it is not a fit for every organization. In order to be implemented effectively, a well-established DevOps culture is necessary. In this scenario, the producing teams are responsible for managing the message broker infrastructure and the non-functional requirements standards.

Centralized ownership

Ownership of the message broker in a centralized ownership organizational structure

Figure 5. Ownership of the message broker in a centralized ownership organizational structure

In a centralized ownership organizational structure, a central team (we’ll call it the platform team) is responsible for the management of the message broker (see Figure 5). Having a specialized platform team offers the advantage of standardized implementation of non-functional requirements, such as reliability, availability, and security. One disadvantage is that the platform team is a single point of failure in both the development and deployment lifecycle. This could become a bottleneck and put team independence and operational efficiency at risk.

Streaming pattern based on Amazon MSK and Kinesis Data Streams

Figure 6. Streaming pattern based on Amazon MSK and Kinesis Data Streams

On top of the implementation patterns mentioned in the previous section, the presence of a dedicated team makes it easier to implement streaming patterns. In this case, a deeper understanding on how the data is partitioned and how the system scales is required. Streaming patterns can be implemented using services such as Amazon Managed Streaming for Apache Kafka (MSK) or Amazon Kinesis Data Streams (see Figure 6).

Best practices for implementing event-driven architectures in your organization

The centralized and decentralized ownership organizational structures enhance team independence or standardization of non-functional requirements respectively. However, they introduce possible limits to the growth of the engineering function in a company. Inspired by the two approaches, you can implement a set of best practices which are aimed at minimizing those limitations.

Best practices for implementing event-driven architectures

Figure 7. Best practices for implementing event-driven architectures

  1. Introduce a cloud center of excellence (CCoE). A CCoE standardizes non-functional implementation across engineering teams. In order to promote a strong DevOps culture, the CCoE should not take the form of an external independent team, but rather be a collection of individual members representing the various engineering teams.
  2. Decentralize team ownership. Decentralize ownership and maintenance of the message broker to producing teams. This will maximize team independence and agility. It empowers the team to use the right tool for the right job, as long as they conform to the CCoE guidelines.
  3. Centralize logging standards and observability strategies. Although it is a best practice to decentralize team ownership of the components of an event-driven architecture, logging standards and observability strategies should be centralized and standardized across the engineering function. This centralization provides for end-to-end tracing of requests and events, which are powerful diagnosis tools in case of any failure.

Conclusion

In this post, we have described the main architectural components of an event-driven architecture, and identified the ownership of the message broker as one of the most important architectural choices you can make. We have described a centralized and decentralized organizational approach, presenting the strengths of the two approaches, as well as the limits they impose on the growth of your engineering organization. We have provided some best practices you can implement in your organization to minimize these limitations.

Further reading:
To start your journey building event-driven architectures in AWS, explore the following:

Detecting and stopping recursive loops in AWS Lambda functions

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/detecting-and-stopping-recursive-loops-in-aws-lambda-functions/

This post is written by Pawan Puthran, Principal Serverless Specialist TAM, Aneel Murari, Senior Serverless Specialist Solution Architect, and Shree Shrikhande, Senior AWS Lambda Product Manager.

AWS Lambda is announcing a recursion control to detect and stop Lambda functions running in a recursive or infinite loop.

At launch, this feature is available for Lambda integrations with Amazon Simple Queue Service (Amazon SQS), Amazon SNS, or when invoking functions directly using the Lambda invoke API. Lambda now detects functions that appear to be running in a recursive loop and drops requests after exceeding 16 invocations.

This can help reduce costs from unexpected Lambda function invocations because of recursion. You receive notifications about this action through the AWS Health Dashboard, email, or by configuring Amazon CloudWatch Alarms.

Overview

You can invoke Lambda functions in multiple ways. AWS services generate events that invoke Lambda functions, and Lambda functions can send messages to other AWS services. In most architectures, the service or resource that invokes a Lambda function should be different from the service or resource that the function outputs to. Because of misconfiguration or coding bugs, a function can send a processed event to the same service or resource that invokes the Lambda function, causing a recursive loop.

Lambda now detects the function running in a recursive loop between supported services, after exceeding 16 invocations. It returns a RecursiveInvocationException to the caller. There is no additional charge for this feature. For asynchronous invokes, Lambda sends the event to a dead-letter queue or on-failure destination, if one is configured.

The following is an example of an order processing system.

Image processing system

Order processing system

  1. A new order information message is sent to the source SQS queue.
  2. Lambda consumes the message from the source queue using an ESM.
  3. The Lambda function processes the message and sends the updated orders message to a destination SQS queue using the SQS SendMessage API.
  4. The source queue has a dead-letter queue(DLQ) configured for handling any failed or unprocessed messages.
  5. Because of a misconfiguration, the Lambda function sends the message to the source SQS queue instead of the destination queue. This causes a recursive loop of Lambda function invocations.

To explore sample code for this example, see the GitHub repo.

In the preceding example, after 16 invocations, Lambda throws a RecursiveInvocationException to the ESM. The ESM stops invoking the Lambda function and, once the maxReceiveCount is exceeded, SQS moves the message to the source queues configured DLQ.

You receive an AWS Health Dashboard notification with steps to troubleshoot the function.

AWS Health Dashboard notification

AWS Health Dashboard notification

You also receive an email notification to the registered email address on the account.

Email notification

Email notification

Lambda emits a RecursiveInvocationsDropped CloudWatch metric, which you can view in the CloudWatch console.

RecursiveInvocationsDropped CloudWatch metric

RecursiveInvocationsDropped CloudWatch metric

How does Lambda detect recursion?

For Lambda to detect recursive loops, your function must use one of the supported AWS SDK versions or higher.

Lambda uses an AWS X-Ray trace header primitive called “Lineage” to track the number of times a function has been invoked with an event. When your function code sends an event using a supported AWS SDK version, Lambda increments the counter in the lineage header. If your function is then invoked with the same triggering event more than 16 times, Lambda stops the next invocation for that event. You do not need to configure active X-Ray tracing for this feature to work.

An example lineage header looks like:

X-Amzn-Trace-Id:Root=1-645f7998-4b1e232810b0bb733dba2eab;Parent=5be88d12eefc1fc0;Sampled=1;Lineage=43e12f0f:5

43e12f0f is the hash of a resource, in this case a Lambda function. 5 is the number of times this function has been invoked with the same event. The logic of hash generation, encoding, and size of the lineage header may change in the future. You should not design any application functionality based on this.

When using an ESM to consume messages from SQS, after the maxReceiveCount value is exceeded, the message is sent to the source queue’s configured DLQ. When Lambda detects a recursive loop and drops subsequent invocations, it returns a RecursiveInvocationException to the ESM. This increments the maxReceiveCount value. When the ESM auto retries to process events, based on the error handling configuration, these retries are not considered recursive invocations.

When using SQS, you can also batch multiple messages into one Lambda event. Where the message batch size is greater than 1, Lambda uses the maximum lineage value within the batch of messages. It drops the entire batch if the value exceeds 16.

Recursion detection in action

You can deploy a sample application example in the GitHub repo to test Lambda recursive loop detection. The application includes a Lambda function that reads from an SQS queue and writes messages back to the same SQS queue.

As prerequisites, you must install:

To deploy the application:

    1. Set your AWS Region:
export REGION=<your AWS region>
    1. Clone the GitHub repository
git clone https://github.com/aws-samples/aws-lambda-recursion-detection-sample.git
cd aws-lambda-recursion-detection-sample
    1. Use AWS SAM to build and deploy the resources to your AWS account. Enter a stack name, such as lambda-recursion, when prompted. Accept the remaining default values.
sam build –-use-container
sam deploy --guided --region $REGION

To test the application:

    1. Save the name of the SQS queue in a local environment variable:
SOURCE_SQS_URL=$(aws cloudformation describe-stacks \ --region $REGION \ --stack-name lambda-recursion \ --query 'Stacks[0].Outputs[?OutputKey==`SourceSQSqueueURL`].OutputValue' --output text)
  1. Publish a message to the source SQS queue:
aws sqs send-message --queue-url $SOURCE_SQS_URL --message-body '{"orderId":"111","productName":"Bolt","orderStatus":"Submitted"}' --region $REGION

This invokes the Lambda function, which writes the message back to the queue.

To verify that Lambda has detected the recursion:

  1. Navigate to the CloudWatch console. Choose All Metrics under Metrics in the left-hand panel and search for RecursiveInvocationsDropped.

    Find RecursiveInvocationsDropped.

    Find RecursiveInvocationsDropped.

  2. Choose Lambda > By Function Name and choose RecursiveInvocationsDropped for the function you created. Under Graphed metrics, change the statistic to sum and Period to 1 minute. You see one record. Refresh if you don’t see the metric after a few seconds.
Metrics sum view

Metrics sum view

Actions to take when Lambda stops a recursive loop

When you receive a notification regarding recursion in your account, the following steps can help address the issue.

  • To stop further invoke attempts while you fix the underlying configuration issue, set the function concurrency to 0. This acts as an off switch for the Lambda function. You can choose the “Throttle” button in the Lambda console or use the PutFunctionConcurrency API to set the function concurrency to 0.
  • You can also disable or delete the event source mapping or trigger for the Lambda function.
  • Check your Lambda function code and configuration for any code defects that create loops. For example, check your environment variables to ensure you are not using the same SQS queue or SNS topic as source and target.
  • If an SQS Queue is the event source for your Lambda function, configure a DLQ on the source queue.
  • If an SNS topic is the event source, configure an On-Failure Destination for the Lambda function.

Disabling recursion detection

You may have valid use-cases where Lambda recursion is intentional as part of your design. In this case, use caution and implement suitable guardrails to prevent unexpected charges to your account. To learn more about best practices for using recursive invocation patterns, see Recursive patterns that cause run-away Lambda functions in the AWS Lambda Operator Guide.

This feature is turned on by default to stop recursive loops. To request turning it off for your account, reach out to AWS Support.

Conclusion

Lambda recursion control for SQS and SNS automatically detects and stops functions running in a recursive or infinite loop. This can be due to misconfiguration or coding errors. Recursion control helps reduce unexpected usage with Lambda and downstream services. The post also explains how Lambda detects and stops recursive loops and notifies you through AWS Health Dashboard to troubleshoot the function.

To learn more about the feature, visit the AWS Lambda Developer Guide.

For more serverless learning resources, visit Serverless Land

Implementing AWS Lambda error handling patterns

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/implementing-aws-lambda-error-handling-patterns/

This post is written by Jeff Chen, Principal Cloud Application Architect, and Jeff Li, Senior Cloud Application Architect

Event-driven architectures are an architecture style that can help you boost agility and build reliable, scalable applications. Splitting an application into loosely coupled services can help each service scale independently. A distributed, loosely coupled application depends on events to communicate application change states. Each service consumes events from other services and emits events to notify other services of state changes.

Handling errors becomes even more important when designing distributed applications. A service may fail if it cannot handle an invalid payload, dependent resources may be unavailable, or the service may time out. There may be permission errors that can cause failures. AWS services provide many features to handle error conditions, which you can use to improve the resiliency of your applications.

This post explores three use-cases and design patterns for handling failures.

Overview

AWS Lambda, Amazon Simple Queue Service (Amazon SQS), Amazon Simple Notification Service (Amazon SNS), and Amazon EventBridge are core building blocks for building serverless event-driven applications.

The post Understanding the Different Ways to Invoke Lambda Functions lists the three different ways of invoking a Lambda function: synchronous, asynchronous, and poll-based invocation. For a list of services and which invocation method they use, see the documentation.

Lambda’s integration with Amazon API Gateway is an example of a synchronous invocation. A client makes a request to API Gateway, which sends the request to Lambda. API Gateway waits for the function response and returns the response to the client. There are no built-in retries or error handling. If the request fails, the client attempts the request again.

Lambda’s integration with SNS and EventBridge are examples of asynchronous invocations. SNS, for example, sends an event to Lambda for processing. When Lambda receives the event, it places it on an internal event queue and returns an acknowledgment to SNS that it has received the message. Another Lambda process reads events from the internal queue and invokes your Lambda function. If SNS cannot deliver an event to your Lambda function, the service automatically retries the same operation based on a retry policy.

Lambda’s integration with SQS uses poll-based invocations. Lambda runs a fleet of pollers that poll your SQS queue for messages. The pollers read the messages in batches and invoke your Lambda function once per batch.

You can apply this pattern in many scenarios. For example, your operational application can add sales orders to an operational data store. You may then want to load the sales orders to your data warehouse periodically so that the information is available for forecasting and analysis. The operational application can batch completed sales as events and place them on an SQS queue. A Lambda function can then process the events and load the completed sale records into your data warehouse.

If your function processes the batch successfully, the pollers delete the messages from the SQS queue. If the batch is not successfully processed, the pollers do not delete the messages from the queue. Once the visibility timeout expires, the messages are available again to be reprocessed. If the message retention period expires, SQS deletes the message from the queue.

The following table shows the invocation types and retry behavior of the AWS services mentioned.

AWS service example Invocation type Retry behavior
Amazon API Gateway Synchronous No built-in retry, client attempts retries.

Amazon SNS

Amazon EventBridge

Asynchronous Built-in retries with exponential backoff.
Amazon SQS Poll-based Retries after visibility timeout expires until message retention period expires.

There are a number of design patterns to use for poll-based and asynchronous invocation types to retain failed messages for additional processing. These patterns can help you recover from delivery or processing failures.

You can explore the patterns and test the scenarios by deploying the code from this repository which uses the AWS Cloud Development Kit (AWS CDK) using Python.

Lambda poll-based invocation pattern

When using Lambda with SQS, if Lambda isn’t able to process the message and the message retention period expires, SQS drops the message. Failure to process the message can be due to function processing failures, including time-outs or invalid payloads. Processing failures can also occur when the destination function does not exist, or has incorrect permissions.

You can configure a separate dead-letter queue (DLQ) on the source queue for SQS to retain the dropped message. A DLQ preserves the original message and is useful for analyzing root causes, handling error conditions properly, or sending notifications that require manual interventions. In the poll-based invocation scenario, the Lambda function itself does not maintain a DLQ. It relies on the external DLQ configured in SQS. For more information, see Using Lambda with Amazon SQS.

The following shows the design pattern when you configure Lambda to poll events from an SQS queue and invoke a Lambda function.

Lambda synchronously polling catches of messages from SQS

Lambda synchronously polling batches of messages from SQS

To explore this pattern, deploy the code in this repository. Once deployed, you can use this instruction to test the pattern with the happy and unhappy paths.

Lambda asynchronous invocation pattern

With asynchronous invokes, there are two failure aspects to consider when using Lambda. The event source cannot deliver the message to Lambda and the Lambda function errors when processing the event.

Event sources vary in how they handle failures delivering messages to Lambda. If SNS or EventBridge cannot send the event to Lambda after exhausting all their retry attempts, the service drops the event. You can configure a DLQ on an SNS topic or EventBridge event bus to hold the dropped event. This works in the same way as the poll-based invocation pattern with SQS.

Lambda functions may then error due to input payload syntax errors, duration time-outs, or the function throws an exception such as a data resource not available.

For asynchronous invokes, you can configure how long Lambda retains an event in its internal queue, up to 6 hours. You can also configure how many times Lambda retries when the function errors, between 0 and 2. Lambda discards the event when the maximum age passes or all retry attempts fail. To retain a copy of discarded events, you can configure either a DLQ or, preferably, a failed-event destination as part of your Lambda function configuration.

A Lambda destination enables you to specify what to do next if an asynchronous invocation succeeds or fails. You can configure a destination to send invocation records to SQS, SNS, EventBridge, or another Lambda function. Destinations are preferred for failure processing as they support additional targets and include additional information. A DLQ holds the original failed event. With a destination, Lambda also passes details of the function’s response in the invocation record. This includes stack traces, which can be useful for analyzing the root cause.

Using both a DLQ and Lambda destinations

You can apply this pattern in many scenarios. For example, many of your applications may contain customer records. To comply with the California Consumer Privacy Act (CCPA), different organizations may need to delete records for a particular customer. You can set up a consumer delete SNS topic. Each organization creates a Lambda function, which processes the events published by the SNS topic and deletes customer records in its managed applications.

The following shows the design pattern when you configure an SNS topic as the event source for a Lambda function, which uses destination queues for success and failure process.

SNS topic as event source for Lambda

SNS topic as event source for Lambda

You configure a DLQ on the SNS topic to capture messages that SNS cannot deliver to Lambda. When Lambda invokes the function, it sends details of the successfully processed messages to an on-success SQS destination. You can use this pattern to route an event to multiple services for simpler use cases. For orchestrating multiple services, AWS Step Functions is a better design choice.

Lambda can also send details of unsuccessfully processed messages to an on-failure SQS destination.

A variant of this pattern is to replace an SQS destination with an EventBridge destination so that multiple consumers can process an event based on the destination.

To explore how to use an SQS DLQ and Lambda destinations, deploy the code in this repository. Once deployed, you can use this instruction to test the pattern with the happy and unhappy paths.

Using a DLQ

Although destinations is the preferred method to handle function failures, you can explore using DLQs.

The following shows the design pattern when you configure an SNS topic as the event source for a Lambda function, which uses SQS queues for failure process.

Lambda invoked asynchonously

Lambda invoked asynchonously

You configure a DLQ on the SNS topic to capture the messages that SNS cannot deliver to the Lambda function. You also configure a separate DLQ for the Lambda function. Lambda saves an unsuccessful event to this DLQ after Lambda cannot process the event after maximum retry attempts.

To explore how to use a Lambda DLQ, deploy the code in this repository. Once deployed, you can use this instruction to test the pattern with happy and unhappy paths.

Conclusion

This post explains three patterns that you can use to design resilient event-driven serverless applications. Error handling during event processing is an important part of designing serverless cloud applications.

You can deploy the code from the repository to explore how to use poll-based and asynchronous invocations. See how poll-based invocations can send failed messages to a DLQ. See how to use DLQs and Lambda destinations to route and handle unsuccessful events.

Learn more about event-driven architecture on Serverless Land.

Implementing AWS Well-Architected best practices for Amazon SQS – Part 3

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-aws-well-architected-best-practices-for-amazon-sqs-part-3/

This blog is written by Chetan Makvana, Senior Solutions Architect and Hardik Vasa, Senior Solutions Architect.

This is the third part of a three-part blog post series that demonstrates best practices for Amazon Simple Queue Service (Amazon SQS) using the AWS Well-Architected Framework.

This blog post covers best practices using the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar. The inventory management example introduced in part 1 of the series will continue to serve as an example.

See also the other two parts of the series:

Performance Efficiency Pillar

The Performance Efficiency Pillar includes the ability to use computing resources efficiently to meet system requirements, and to maintain that efficiency as demand changes and technologies evolve. It recommends best practices to use trade-offs to improve performance, such as learning about design patterns and services and identify how tradeoffs impact customers and efficiency.

By adopting these best practices, you can optimize the performance of SQS by employing appropriate configurations and techniques while considering trade-offs for the specific use case.

Best practice: Use action batching or horizontal scaling or both to increase throughput

For achieving high throughput in SQS, optimizing the performance of your message processing is crucial. You can use two techniques: horizontal scaling and action batching.

When dealing with high message volume, consider horizontally scaling the message producers and consumers by increasing the number of threads per client, by adding more clients, or both. By distributing the load across multiple threads or clients, you can handle a high number of messages concurrently.

Action batching distributes the latency of the batch action over the multiple messages in a batch request, rather than accepting the entire latency for a single message. Because each round trip carries more work, batch requests make more efficient use of threads and connections, improving throughput. You can combine batching with horizontal scaling to provide throughput with fewer threads, connections, and requests than individual message requests.

In the inventory management example that we introduced in part 1, this scaling behavior is managed by AWS for the AWS Lambda function responsible for backend processing. When a Lambda function subscribes to an SQS queue, Lambda polls the queue as it waits for the inventory updates requests to arrive. Lambda consumes messages in batches, starting at five concurrent batches with five functions at a time. If there are more messages in the queue, Lambda adds up to 60 functions per minute, up to 1,000 functions, to consume those messages.

This means that Lambda can scale up to 1,000 concurrent Lambda functions processing messages from the SQS queue. Batching enables the inventory management system to handle a high volume of inventory update messages efficiently. This ensures real-time visibility into inventory levels and enhances the accuracy and responsiveness of inventory management operations.

Best practice: Trade-off between SQS standard and First-In-First-Out (FIFO) queues

SQS supports two types of queues: standard queues and FIFO queues. Understanding the trade-offs between SQS standard and FIFO queues allows you to make an informed choice that aligns with your application’s requirements and priorities. While SQS standard queues support a nearly unlimited throughput, it sacrifices strict message ordering and occasionally delivers messages in an order different from the one they were sent in. If maintaining the exact order of events is not critical for your application, utilizing SQS standard queues can provide significant benefits in terms of throughput and scalability.

On the other hand, SQS FIFO queues guarantee message ordering and exactly-once processing. This makes them suitable for applications where maintaining the order of events is crucial, such as financial transactions or event-driven workflows. However, FIFO queues have a lower throughput compared to standard queues. They can handle up to 3,000 transactions per second (TPS) per API method with batching, and 300 TPS without batching. Consider using FIFO queues only when the order of events is important for the application, otherwise use standard queues.

In the inventory management example, since the order of inventory records is not crucial, the potential out-of-order message delivery that can occur with SQS standard queues is unlikely to impact the inventory processing. This allows you to take advantage of the benefits provided by SQS standard queues, including their ability to handle a high number of transactions per second.

Cost Optimization Pillar

The Cost Optimization Pillar includes the ability to run systems to deliver business value at the lowest price. It recommends best practices to build and operate cost-aware workloads that achieve business outcomes while minimizing costs and allowing your organization to maximize its return on investment.

Best practice: Configure cost allocation tags for SQS to organize and identify SQS for cost allocation

A well-defined tagging strategy plays a vital role in establishing accurate chargeback or showback models. By assigning appropriate tags to resources, such as SQS queues, you can precisely allocate costs to different teams or applications. This level of granularity ensures fair and transparent cost allocation, enabling better financial management and accountability.

In the inventory management example, tagging the SQS queue allows for specific cost tracking under the Inventory department, enabling a more accurate assessment of expenses. The following code snippet shows how to tag the SQS queue using AWS Could Development Kit (AWS CDK).

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
)

Tags.of(queue).add("department", "inventory")

Best practice: Use long polling

SQS offers two methods for receiving messages from a queue: short polling and long polling. By default, queues use short polling, where the ReceiveMessage request queries a subset of servers to identify available messages. Even if the query found no messages, SQS sends the response right away.

In contrast, long polling queries all servers in the SQS infrastructure to check for available messages. SQS responds only after collecting at least one message, respecting the specified maximum. If no messages are immediately available, the request is held open until a message becomes available or the polling wait time expires. In such cases, an empty response is sent.

Short polling provides immediate responses, making it suitable for applications that require quick feedback or near-real-time processing. On the other hand, long polling is ideal when efficiency is prioritized over immediate feedback. It reduces API calls, minimizes network traffic, and improves resource utilization, leading to cost savings.

In the inventory management example, long polling enhances the efficiency of processing inventory updates. It collects and retrieves available inventory update messages in a batch of 10, reducing the frequency of API requests. This batching approach optimizes resource utilization, minimizes network traffic, and reduces excessive API consumption, resulting in cost savings. You can configure this behavior using batch size and batch window:

# Add the SQS queue as a trigger to the Lambda function
sqs_to_dynamodb_function.add_event_source_mapping(
    "MyQueueTrigger", event_source_arn=queue.queue_arn, batch_size=10
)

Best practice: Use batching

Batching messages together allows you to send or retrieve multiple messages in a single API call. This reduces the number of API requests required to process or retrieve messages compared to sending or retrieving messages individually. Since SQS pricing is based on the number of API requests, reducing the number of requests can lead to cost savings.

To send, receive, and delete messages, and to change the message visibility timeout for multiple messages with a single action, use Amazon SQS batch API actions. This also helps with transferring less data, effectively reducing the associated data transfer costs, especially if you have many messages.

In the context of the inventory management example, the CSV processing Lambda function groups 10 inventory records together in each API call, forming a batch. By doing so, the number of API requests is reduced by a factor of 10 compared to sending each record separately. This approach optimizes the utilization of API resources, streamlines message processing, and ultimately contributes to cost efficiency. Following is the code snippet from the CSV processing Lambda function showcasing the use of SendMessageBatch to send 10 messages with a single action.

# Parse the CSV records and send them to SQS as batch messages
csv_reader = csv.DictReader(csv_content.splitlines())
message_batch = []
for row in csv_reader:
    # Convert the row to JSON
    json_message = json.dumps(row)

    # Add the message to the batch
    message_batch.append(
        {"Id": str(len(message_batch) + 1), "MessageBody": json_message}
    )

    # Send the batch of messages when it reaches the maximum batch size (10 messages)
    if len(message_batch) == 10:
        sqs_client.send_message_batch(QueueUrl=queue_url, Entries=message_batch)
        message_batch = []
        print("Sent messages in batch")

Best practice: Use temporary queues

In case of short-lived, lightweight messaging with synchronous two-way communication, you can use temporary queues. The temporary queue makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill. The key concept behind this is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS, and no costs associated with creating a virtual queue.

The inventory management example does not use temporary queues. However, in use cases that involve short-lived, lightweight messaging with synchronous two-way communication, adopting the best practice of using temporary queues and virtual queues can enhance the overall efficiency, reduce costs, and simplify the management of messaging destinations.

Sustainability Pillar

The Sustainability Pillar provides best practices to meet sustainability targets for your AWS workloads. It encompasses considerations related to energy efficiency and resource optimization.

Best practice: Use long polling

Besides its cost optimization benefits explained as part of the Cost Optimization Pillar, long polling also plays a crucial role in improving resource efficiency by reducing API requests, minimizing network traffic, and optimizing resource utilization.

By collecting and retrieving available messages in a batch, long polling reduces the frequency of API requests, resulting in improved resource utilization and minimized network traffic. By reducing excessive API consumption through long polling, you can effectively use resources. It collects and retrieves messages in batches, reducing excessive API consumption and unnecessary network traffic.

By reducing API calls, it optimizes data transfer and infrastructure operations. Additionally, long polling’s batching approach optimizes resource allocation, utilizing system resources more efficiently and improving energy efficiency. This enables the inventory management system to handle high message volumes effectively while operating in a cost-efficient and resource-efficient manner.

Conclusion

This blog post explores best practices for SQS using the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar of the AWS Well-Architected Framework. We cover techniques such as batch processing, message batching, and scaling considerations. We also discuss important considerations, such as resource utilization, minimizing resource waste, and reducing cost.

This three-part blog post series covers a wide range of best practices, spanning the Operational Excellence, Security, Reliability, Performance Efficiency, Cost Optimization, and Sustainability Pillars of the AWS Well-Architected Framework. By following these guidelines and leveraging the power of the AWS Well-Architected Framework, you can build robust, secure, and efficient messaging systems using SQS.

For more serverless learning resources, visit Serverless Land.

Implementing AWS Well-Architected best practices for Amazon SQS – Part 2

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-aws-well-architected-best-practices-for-amazon-sqs-part-2/

This blog is written by Chetan Makvana, Senior Solutions Architect and Hardik Vasa, Senior Solutions Architect.

This is the second part of a three-part blog post series that demonstrates implementing best practices for Amazon Simple Queue Service (Amazon SQS) using the AWS Well-Architected Framework.

This blog post covers best practices using the Security Pillar and Reliability Pillar of the AWS Well-Architected Framework. The inventory management example introduced in part 1 of the series will continue to serve as an example.

See also the other two parts of the series:

Security Pillar

The Security Pillar includes the ability to protect data, systems, and assets and to take advantage of cloud technologies to improve your security. This pillar recommends putting in place practices that influence security. Using these best practices, you can protect data while in-transit (as it travels to and from SQS) and at rest (while stored on disk in SQS), or control who can do what with SQS.

Best practice: Configure server-side encryption

If your application has a compliance requirement such as HIPAA, GDPR, or PCI-DSS mandating encryption at rest, if you are looking to improve data security to protect against unauthorized access, or if you are just looking for simplified key management for the messages sent to the SQS queue, you can leverage Server-Side Encryption (SSE) to protect the privacy and integrity of your data stored on SQS.

SQS and AWS Key Management Service (KMS) offer two options for configuring server-side encryption. SQS-managed encryptions keys (SSE-SQS) provide automatic encryption of messages stored in SQS queues using AWS-managed keys. This feature is enabled by default when you create a queue. If you choose to use your own AWS KMS keys to encrypt and decrypt messages stored in SQS, you can use the SSE-KMS feature.

Amazon SQS Encryption Settings

SSE-KMS provides greater control and flexibility over encryption keys, while SSE-SQS simplifies the process by managing the encryption keys for you. Both options help you protect sensitive data and comply with regulatory requirements by encrypting data at rest in SQS queues. Note that SSE-SQS only encrypts the message body and not the message attributes.

In the inventory management example introduced in part 1, an AWS Lambda function responsible for CSV processing sends incoming messages to an SQS queue when an inventory updates file is dropped into the Amazon Simple Storage Service (Amazon S3) bucket. SQS encrypts these messages in the queue using SQS-SSE. When a backend processing Lambda polls messages from the queue, the encrypted message is decrypted, and the function inserts inventory updates into Amazon DynamoDB.

The AWS Could Development Kit (AWS CDK) code sets SSE-SQS as the default encryption key type. However, the following AWS CDK code shows how to encrypt the queue with SSE-KMS.

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
    encryption=sqs.QueueEncryption.KMS_MANAGED,
)

Best practice: Implement least-privilege access using access policy

For securing your resources in AWS, implementing least-privilege access is critical. This means granting users and services the minimum level of access required to perform their tasks. Least-privilege access provides better security, allows you to meet your compliance requirements, and offers accountability via a clear audit trail of who accessed what resources and when.

By implementing least-privilege access using access policies, you can help reduce the risk of security breaches and ensure that your resources are only accessed by authorized users and services. AWS Identity and Access Management (IAM) policies apply to users, groups, and roles, while resource-based policies apply to AWS resources such as SQS queues. To implement least-privilege access, it’s essential to start by defining what actions are required for each user or service to perform their tasks.

In the inventory management example, the CSV processing Lambda function doesn’t perform any other task beyond parsing the inventory updates file and sending the inventory records to the SQS queue for further processing. To ensure that the function has the permissions to send messages to the SQS queue, grant the SQS queue access to the IAM role that the Lambda function assumes. By granting the SQS queue access to the Lambda function’s IAM role, you establish a secure and controlled communication channel. The Lambda function can only interact with the SQS queue and doesn’t have unnecessary access or permissions that might compromise the system’s security.

# Create pre-processing Lambda function
csv_processing_to_sqs_function = _lambda.Function(
    self,
    "CSVProcessingToSQSFunction",
    runtime=_lambda.Runtime.PYTHON_3_8,
    code=_lambda.Code.from_asset("sqs_blog/lambda"),
    handler="CSVProcessingToSQSFunction.lambda_handler",
    role=role,
    tracing=Tracing.ACTIVE,
)

# Define the queue policy to allow messages from the Lambda function's role only
policy = iam.PolicyStatement(
    actions=["sqs:SendMessage"],
    effect=iam.Effect.ALLOW,
    principals=[iam.ArnPrincipal(role.role_arn)],
    resources=[queue.queue_arn],
)

queue.add_to_resource_policy(policy)

Best practice: Allow only encrypted connections over HTTPS using aws:SecureTransport

It is essential to have a secure and reliable method for transferring data between AWS services and on-premises environments or other external systems. With HTTPS, a network-based attacker cannot eavesdrop on network traffic or manipulate it, using an attack such as man-in-the-middle.

With SQS, you can choose to allow only encrypted connections over HTTPS using the aws:SecureTransport condition key in the queue policy. With this condition in place, any requests made over non-secure HTTP receive a 400 InvalidSecurity error from SQS.

In the inventory management example, the CSV processing Lambda function sends inventory updates to the SQS queue. To ensure secure data transfer, the Lambda function uses the HTTPS endpoint provided by SQS. This guarantees that the communication between the Lambda function and the SQS queue remains encrypted and resistant to potential security threats.

# Create an IAM policy statement allowing only HTTPS access to the queue
secure_transport_policy = iam.PolicyStatement(
    effect=iam.Effect.DENY,
    actions=["sqs:*"],
    resources=[queue.queue_arn],
    conditions={
        "Bool": {
            "aws:SecureTransport": "false",
        },
    },
)

Best practice: Use attribute-based access controls (ABAC)

Some use-cases require granular access control. For example, authorizing a user based on user roles, environment, department, or location. Additionally, dynamic authorization is required based on changing user attributes. In this case, you need an access control mechanism based on user attributes.

Attribute-based access controls (ABAC) is an authorization strategy that defines permissions based on tags attached to users and AWS resources. With ABAC, you can use tags to configure IAM access permissions and policies for your queues. ABAC hence enables you to scale your permission management easily. You can author a single permission policy in IAM using tags created for each business role, and no longer need to update the policy when adding new resources.

ABAC for SQS queues enables two key use cases:

  • Tag-based access control: use tags to control access to your SQS queues, including control plane and data plane API calls.
  • Tag-on-create: enforce tags at the time of creation of an SQS queues and deny the creation of SQS resources without tags.

Reliability Pillar

The Reliability Pillar encompasses the ability of a workload to perform its intended function correctly and consistently when it’s expected to. By leveraging the best practices outlined in this pillar, you can enhance the way you manage messages in SQS.

Best practice: Configure dead-letter queues

In a distributed system, when messages flow between sub-systems, there is a possibility that some messages may not be processed right away. This could be because of the message being corrupted or downstream processing being temporarily unavailable. In such situations, it is not ideal for the bad message to block other messages in the queue.

Dead Letter Queues (DLQs) in SQS can improve the reliability of your application by providing an additional layer of fault tolerance, simplifying debugging, providing a retry mechanism, and separating problematic messages from the main queue. By incorporating DLQs into your application architecture, you can build a more robust and reliable system that can handle errors and maintain high levels of performance and availability.

In the inventory management example, a DLQ plays a vital role in adding message resiliency and preventing situations where a single bad message blocks the processing of other messages. If the backend Lambda function fails after multiple attempts, the inventory update message is redirected to the DLQ. By inspecting these unconsumed messages, you can troubleshoot and redrive them to the primary queue or to custom destination using the DLQ redrive feature. You can also automate redrive by using a set of APIs programmatically. This ensures accurate inventory updates and prevents data loss.

The following AWS CDK code snippet shows how to create a DLQ for the source queue and sets up a DLQ policy to only allow messages from the source SQS queue. It is recommended not to set the max_receive_count value to 1, especially when using a Lambda function as the consumer, to avoid accumulating many messages in the DLQ.

# Create the Dead Letter Queue (DLQ)
dlq = sqs.Queue(self, "InventoryUpdatesDlq", visibility_timeout=Duration.seconds(300))

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
    dead_letter_queue=sqs.DeadLetterQueue(
        max_receive_count=3,  # Number of retries before sending the message to the DLQ
        queue=dlq,
    ),
)
# Create an SQS queue policy to allow source queue to send messages to the DLQ
policy = iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["sqs:SendMessage"],
    resources=[dlq.queue_arn],
    conditions={"ArnEquals": {"aws:SourceArn": queue.queue_arn}},
)
queue.queue_policy = iam.PolicyDocument(statements=[policy])

Best practice: Process messages in a timely manner by configuring the right visibility timeout

Setting the appropriate visibility timeout is crucial for efficient message processing in SQS. The visibility timeout is the period during which SQS prevents other consumers from receiving and processing a message after it has been polled from the queue.

To determine the ideal visibility timeout for your application, consider your specific use case. If your application typically processes messages within a few seconds, set the visibility timeout to a few minutes. This ensures that multiple consumers don’t process the message simultaneously. If your application requires more time to process messages, consider breaking them down into smaller units or batching them to improve performance.

If a message fails to process and is returned to the queue, it will not be available for processing again until the visibility timeout period has elapsed. Increasing the visibility timeout will increase the overall latency of your application. Therefore, it’s important to balance the tradeoff between reducing the likelihood of message duplication and maintaining a responsive application.

In the inventory management example, setting the right visibility timeout helps the application fail fast and improve the message processing times. Since the Lambda function typically processes messages within milliseconds, a visibility timeout of 30 seconds is set in the following AWS CDK code snippet.

queue = sqs.Queue(
    self,
    " InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(30),
)

It is recommended to keep the SQS queue visibility timeout to at least six times the Lambda function timeout, plus the value of MaximumBatchingWindowInSeconds. This allows Lambda function to retry the messages if the invocation fails.

Conclusion

This blog post explores best practices for SQS using the Security Pillar and Reliability Pillar of the AWS Well-Architected Framework. We discuss various best practices and considerations to ensure the security of SQS. By following these best practices, you can create a robust and secure messaging system using SQS. We also highlight fault tolerance and processing a message in a timely manner as important aspects of building reliable applications using SQS.

The next part of this blog post series focuses on the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar of the AWS Well-Architected Framework and explore best practices for SQS.

For more serverless learning resources, visit Serverless Land.