Tag Archives: AWS Step Functions

Trigger an AWS Glue DataBrew job based on an event generated from another DataBrew job

Post Syndicated from Nipun Chagari original https://aws.amazon.com/blogs/big-data/trigger-an-aws-glue-databrew-job-based-on-an-event-generated-from-another-databrew-job/

Organizations today have continuous incoming data, and analyzing this data in a timely fashion is becoming a common requirement for data analytics and machine learning (ML) use cases. As part of this, you need clean data in order to gain insights that can enable enterprises to get the most out of their data for business growth and profitability. You can now use AWS Glue DataBrew, a visual data preparation tool that makes it easy to transform and prepare datasets for analytics and ML workloads.

As we build these data analytics pipelines, we can decouple the jobs by building event-driven analytics and ML workflow pipelines. In this post, we walk through how to trigger a DataBrew job automatically on an event generated from another DataBrew job using Amazon EventBridge and AWS Step Functions.

Overview of solution

The following diagram illustrates the architecture of the solution. We use AWS CloudFormation to deploy an EventBridge rule, an Amazon Simple Queue Service (Amazon SQS) queue, and Step Functions resources to trigger the second DataBrew job.

The steps in this solution are as follows:

  1. Import your dataset to Amazon Simple Storage Service (Amazon S3).
  2. DataBrew queries the data from Amazon S3 by creating a recipe and performing transformations.
  3. The first DataBrew recipe job writes the output to an S3 bucket.
  4. When the first recipe job is complete, it triggers an EventBridge event.
  5. A Step Functions state machine is invoked based on the event, which in turn invokes the second DataBrew recipe job for further processing.
  6. The event is delivered to the dead-letter queue if the rule in EventBridge can’t invoke the state machine successfully.
  7. DataBrew queries data from an S3 bucket by creating a recipe and performing transformations.
  8. The second DataBrew recipe job writes the output to the same S3 bucket.

Prerequisites

To use this solution, you need the following prerequisites:

Load the dataset into Amazon S3

For this post, we use the Credit Card customers sample dataset from Kaggle. This data consists of 10,000 customers, including their age, salary, marital status, credit card limit, credit card category, and more. Download the sample dataset and follow the instructions. We recommend creating all your resources in the same account and Region.

Create a DataBrew project

To create a DataBrew project, complete the following steps:

  1. On the DataBrew console, choose Projects and choose Create project.
  2. For Project name, enter marketing-campaign-project-1.
  3. For Select a dataset, select New dataset.
  4. Under Data lake/data store, choose Amazon S3.
  5. For Enter your source from S3, enter the S3 path of the sample dataset.
  6. Select the dataset CSV file.
  7. Under Permissions, for Role name, choose an existing IAM role created during the prerequisites or create a new role.
  8. For New IAM role suffix, enter a suffix.
  9. Choose Create project.

After the project is opened, a DataBrew interactive session is created. DataBrew retrieves sample data based on your sampling configuration selection.

Create the DataBrew jobs

Now we can create the recipe jobs.

  1. On the DataBrew console, in the navigation pane, choose Projects.
  2. On the Projects page, select the project marketing-campaign-project-1.
  3. Choose Open project and choose Add step.
  4. In this step, we choose Delete to drop the unnecessary columns from our dataset that aren’t required for this exercise.

You can choose from over 250 built-in functions to merge, pivot, and transpose the data without writing code.

  1. Select the columns to delete and choose Apply.
  2. Choose Create job.
  3. For Job name, enter marketing-campaign-job1.
  4. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose CSV).
  5. For S3 location, enter your final S3 output bucket path.
  6. Under Settings, for File output storage, select Replace output files for each job run.
  7. Choose Save.
  8. Under Permissions, for Role name¸ choose an existing role created during the prerequisites or create a new role.
  9. Choose Create job.

Now we repeat the same steps to create another DataBrew project and DataBrew job.

  1. For this post, I named the second project marketing-campaign-project2 and named the job marketing-campaign-job2.
  2. When you create the new project, this time use the job1 output file location as the new dataset.
  3. For this job, we deselect Unknown and Uneducated in the Education_Level column.

Deploy your resources using CloudFormation

For a quick start of this solution, we deploy the resources with a CloudFormation stack. The stack creates the EventBridge rule, SQS queue, and Step Functions state machine in your account to trigger the second DataBrew job when the first job runs successfully.

  1. Choose Launch Stack:
  2. For DataBrew source job name, enter marketing-campaign-job1.
  3. For DataBrew target job name, enter marketing-campaign-job2.
  4. For both IAM role configurations, make the following choice:
    1. If you choose Create a new Role, the stack automatically creates a role for you.
    2. If you choose Attach an existing IAM role, you must populate the IAM role ARN manually in the following field or else the stack creation fails.
  5. Choose Next.
  6. Select the two acknowledgement check boxes.
  7. Choose Create stack.

Test the solution

To test the solution, complete the following steps:

  1. On the DataBrew console, choose Jobs.
  2. Select the job marketing-campaign-job1 and choose Run job.

This action automatically triggers the second job, marketing-campaign-job2, via EventBridge and Step Functions.

  1. When both jobs are complete, open the output link for marketing-campaign-job2.

You’re redirected to the Amazon S3 console to access the output file.

In this solution, we created a workflow that required minimal code. The first job triggers the second job, and both jobs deliver the transformed data files to Amazon S3.

Clean up

To avoid incurring future charges, delete all the resources created during this walkthrough:

  • IAM roles
  • DataBrew projects and their associated recipe jobs
  • S3 bucket
  • CloudFormation stack

Conclusion

In this post, we walked through how to use DataBrew along with EventBridge and Step Functions to run a DataBrew job that automatically triggers another DataBrew job. We encourage you to use this pattern for event-driven pipelines where you can build sequence jobs to run multiple jobs in conjunction with other jobs.


About the Authors

Nipun Chagari is a Senior Solutions Architect at AWS, where he helps customers build highly available, scalable, and resilient applications on the AWS Cloud. He is passionate about helping customers adopt serverless technology to meet their business objectives.

Prarthana Angadi is a Software Development Engineer II at AWS, where she has been expanding what is possible with code in order to make life more efficient for AWS customers.

Testing Amazon EventBridge events using AWS Step Functions

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/testing-amazon-eventbridge-events-using-aws-step-functions/

This post is written by Siarhei Kazhura, Solutions Architect and Riaz Panjwani, Solutions Architect.

Amazon EventBridge is a serverless event bus that can be used to ingest and process events from a variety of sources, such as AWS services and SaaS applications. With EventBridge, developers can build loosely coupled and independently scalable event-driven applications.

It can be useful to know with EventBridge when events are not able to reach the desired destination. This can be caused by multiple factors, such as:

  1. Event pattern does not match the event rule
  2. Event transformer failure
  3. Event destination expects a different payload (for example, API destinations) and returns an error

EventBridge sends metrics to Amazon CloudWatch, which allows for the detection of failed invocations on a given event rule. You can also use EventBridge rules with a dead-letter queue (DLQ) to identify any failed event deliveries. The messages delivered to the queue contain additional metadata such as error codes, error messages, and the target ARN for debugging.

However, understanding why events fail to deliver is still a manual process. Checking CloudWatch metrics for failures, and then the DLQ takes time. This is evident when developing new functionality, when you must constantly update the event matching patterns and event transformers, and run tests to see if they provide the desired effect. EventBridge sandbox functionality can help with manual testing but this approach does not scale or help with automated event testing.

This post demonstrates how to automate testing for EventBridge events. It uses AWS Step Functions for orchestration, along with Amazon DynamoDB and Amazon S3 to capture the results of your events, Amazon SQS for the DLQ, and AWS Lambda to invoke the workflows and processing.

Overview

Using the solution provided in this post, users can track events from its inception to delivery and identify where any issues or errors are occurring. This solution is also customizable, and can incorporate integration tests against events to test pattern matching and transformations.

Reference architecture

At a high level:

  1. The event testing workflow is exposed via an API Gateway endpoint, and users can send a request.
  2. This request is validated and routed to a Step Functions EventTester workflow, which performs the event test.
  3. The EventTester workflow creates a sample event based on the received payload, and performs multiple tests on the sample event.
  4. The sample event is matched against the rule that is being tested. The results are stored in an Amazon DynamoDB EventTracking table, and the transformed event payload is stored in the TransformedEventPayload Amazon S3 bucket.
  5. The EventTester workflow has an embedded AWS Step Functions workflow called EventStatusPoller. The EventStatusPoller workflow polls the EventTracking table.
  6. The EventStatusPoller workflow has a customizable 10-second timeout. If the timeout is reached, this may indicate that the event pattern does not match. EventBridge tests if the event does not match against a given pattern, using the AWS SDK for EventBridge.
  7. After completing the tests, the response is formatted and sent back to the API Gateway. By default, the timeout is set to 15 seconds.
  8. API Gateway processes the response, strips the unnecessary elements, and sends the response back to the issuer. You can use this response to verify if the test event delivery is successful, or identify the reason a failure occurred.

EventTester workflow

After an API call, this event is sent to the EventTester express workflow. This orchestrates the automated testing, and returns the results of the test.

EventTester workflow

In this workflow:

1. The test event is sent to EventBridge to see if the event matches the rule and can be transformed. The result is stored in a DynamoDB table.
2. The PollEventStatus synchronous Express Workflow is invoked. It polls the DynamoDB table until a record with the event ID is found or it reaches the timeout. The configurable timeout is 15 seconds by default.
3. If a record is found, it checks the event status.

From here, there are three possible states. In the first state, if the event status has succeeded:

4. The response from the PollEventStatus workflow is parsed and the payload is formatted.
5. The payload is stored in an S3 bucket.
6. The final response is created, which includes the payload, the event ID, and the event status.
7. The execution is successful, and the final response is returned to the user.

In the second state, if no record is found in the table and the PollEventStatus workflow reaches the timeout:

8. The most likely explanation for reaching the timeout is that the event pattern does not match the rule, so the event is not processed. You can build a test to verify if this is the issue.
9. From the EventBridge SDK, the TestEventPattern call is made to see if the event pattern matches the rule.
10. The results of the TestEventPattern call are checked.
11. If the event pattern does not match the rule, then the issue has been successfully identified and the response is created to be sent back to the user. If the event pattern matches the rule, then the issue has not been identified.
12. The response shows that this is an unexpected error.

In the third state, this acts as a catch-all to any other errors that may occur:

13. The response is created with the details of the unexpected error.
14. The execution has failed, and the final response is sent back to the user.

Event testing process

The following diagram shows how events are sent to EventBridge and their results are captured in S3 and DynamoDB. This is the first step of the EventTester workflow:

Event testing process

When the event is tested:

  1. The sample event is received and sent to the EventBridge custom event bus.
  2. A CatchAll rule is triggered, which captures all events on the custom event bus.
  3. All events from the CatchAll rule are sent to a CloudWatch log group, which allows for an original payload inspection.
  4. The event is also propagated to the EventTesting rule. The event is matched against the rule pattern, and if successful the event is transformed based on the transformer provided.
  5. If the event is matched and transformed successfully, the Lambda function EventProcessor is invoked to process the transformed event payload. You can add additional custom code to this function for further testing of the event (for example, API integration with the transformed payload).
  6. The event status is updated to SUCCESS and the event metadata is saved to the EventTracking DynamoDB table.
  7. The transformed event payload is saved to the TransformedEventPayload S3 bucket.
  8. If there’s an error, EventBridge sends the event to the SQS DLQ.
  9. The Lambda function ErrorHandler polls the DLQ and processes the errors in batches.
  10. The event status is updated to ERROR and the event metadata is saved to the EventTracking DynamoDB table.
  11. The event payload is saved to the TransformedEventPayload S3 bucket.

EventStatusPoller workflow

EventStatusPoller workflow

When the poller runs:

  1. It checks the DynamoDB table to see if the event has been processed.
  2. The result of the poll is checked.
  3. If the event has not been processed, the workflow loops and polls the DynamoDB table again.
  4. If the event has been processed, the results of the event are passed to next step in the Event Testing workflow.

Visit Composing AWS Step Functions to abstract polling of asynchronous services for additional details.

Testing at scale

Testing at scale

The EventTester workflow uses Express Workflows, which can handle testing high volume event workloads. For example, you can run the solution against large volumes of historical events stored in S3 or CloudWatch.

This can be achieved by using services such as Lambda or AWS Fargate to read the events in batches and run tests simultaneously. To achieve optimal performance, some performance tuning may be required depending on the scale and events that are being tested.

To minimize the cost of the demo, the DynamoDB table is provisioned with 5 read capacity units and 5 write capacity units. For a production system, consider using on-demand capacity, or update the provisioned table capacity.

Event sampling

Event sampling

In this implementation, the EventBridge EventTester can be used to periodically sample events from your system for testing:

  1. Any existing rules that must be tested are provisioned via the AWS CDK.
  2. The sampling rule is added to an existing event bus, and has the same pattern as the rule that is tested. This filters out events that are not processed by the tested rule.
  3. SQS queue is used for buffering.
  4. Lambda function processes events in batches, and can optionally implement sampling. For example, setting a 10% sampling rate will take one random message out of 10 messages in a given batch.
  5. The event is tested against the endpoint provided. Note that the EventTesting rule is also provisioned via AWS CDK from the same code base as the tested rule. The tested rule is replicated into the EventTesting workflow.
  6. The result is returned to a Lambda function, and is then sent to CloudWatch Logs.
  7. A metric is set based on the number of ERROR responses in the logs.
  8. An alarm is configured when the ERROR metric crosses a provided threshold.

This sampling can complement existing metrics exposed for EventBridge via CloudWatch.

Solution walkthrough

To follow the solution walkthrough, visit the solution repository. The walkthrough explains:

  1. Prerequisites required.
  2. Detailed solution deployment walkthrough.
  3. Solution customization and testing.
  4. Cleanup process.
  5. Cost considerations.

Conclusion

This blog post outlines how to use Step Functions, Lambda, SQS, DynamoDB, and S3 to create a workflow that automates the testing of EventBridge events. With this example, you can send events to the EventBridge Event Tester endpoint to verify that event delivery is successful or identify the root cause for event delivery failures.

For more serverless learning resources, visit Serverless Land.

AWS Week in Review – May 16, 2022

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/aws-week-in-review-may-16-2022/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

I had been on the road for the last five weeks and attended many of the AWS Summits in Europe. It was great to talk to so many of you in person. The Serverless Developer Advocates are going around many of the AWS Summits with the Serverlesspresso booth. If you attend an event that has the booth, say “Hi 👋” to my colleagues, and have a coffee while asking all your serverless questions. You can find all the upcoming AWS Summits in the events section at the end of this post.

Last week’s launches
Here are some launches that got my attention during the previous week.

AWS Step Functions announced a new console experience to debug your state machine executions – Now you can opt-in to the new console experience of Step Functions, which makes it easier to analyze, debug, and optimize Standard Workflows. The new page allows you to inspect executions using three different views: graph, table, and event view, and add many new features to enhance the navigation and analysis of the executions. To learn about all the features and how to use them, read Ben’s blog post.

Example on how the Graph View looks

Example on how the Graph View looks

AWS Lambda now supports Node.js 16.x runtime – Now you can start using the Node.js 16 runtime when you create a new function or update your existing functions to use it. You can also use the new container image base that supports this runtime. To learn more about this launch, check Dan’s blog post.

AWS Amplify announces its Android library designed for Kotlin – The Amplify Android library has been rewritten for Kotlin, and now it is available in preview. This new library provides better debugging capacities and visibility into underlying state management. And it is also using the new AWS SDK for Kotlin that was released last year in preview. Read the What’s New post for more information.

Three new APIs for batch data retrieval in AWS IoT SiteWise – With this new launch AWS IoT SiteWise now supports batch data retrieval from multiple asset properties. The new APIs allow you to retrieve current values, historical values, and aggregated values. Read the What’s New post to learn how you can start using the new APIs.

AWS Secrets Manager now publishes secret usage metrics to Amazon CloudWatch – This launch is very useful to see the number of secrets in your account and set alarms for any unexpected increase or decrease in the number of secrets. Read the documentation on Monitoring Secrets Manager with Amazon CloudWatch for more information.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Some other launches and news that you may have missed:

IBM signed a deal with AWS to offer its software portfolio as a service on AWS. This allows customers using AWS to access IBM software for automation, data and artificial intelligence, and security that is built on Red Hat OpenShift Service on AWS.

Podcast Charlas Técnicas de AWS – If you understand Spanish, this podcast is for you. Podcast Charlas Técnicas is one of the official AWS podcasts in Spanish. This week’s episode introduces you to Amazon DynamoDB and shares stories on how different customers use this database service. You can listen to all the episodes directly from your favorite podcast app or the podcast web page.

AWS Open Source News and Updates – Ricardo Sueiras, my colleague from the AWS Developer Relation team, runs this newsletter. It brings you all the latest open-source projects, posts, and more. Read edition #112 here.

Upcoming AWS Events
It’s AWS Summits season and here are some virtual and in-person events that might be close to you:

You can register for re:MARS to get fresh ideas on topics such as machine learning, automation, robotics, and space. The conference will be in person in Las Vegas, June 21–24.

That’s all for this week. Check back next Monday for another Week in Review!

— Marcia

Use direct service integrations to optimize your architecture

Post Syndicated from Jerome Van Der Linden original https://aws.amazon.com/blogs/architecture/use-direct-service-integrations-to-optimize-your-architecture/

When designing an application, you must integrate and combine several AWS services in the most optimized way for an effective and efficient architecture:

  • Optimize for performance by reducing the latency between services
  • Optimize for costs operability and sustainability, by avoiding unnecessary components and reducing workload footprint
  • Optimize for resiliency by removing potential point of failures
  • Optimize for security by minimizing the attack surface

As stated in the Serverless Application Lens of the Well-Architected Framework, “If your AWS Lambda function is not performing custom logic while integrating with other AWS services, chances are that it may be unnecessary.” In addition, Amazon API Gateway, AWS AppSync, AWS Step Functions, Amazon EventBridge, and Lambda Destinations can directly integrate with a number of services. These optimizations can offer you more value and less operational overhead.

This blog post will show how to optimize an architecture with direct integration.

Workflow example and initial architecture

Figure 1 shows a typical workflow for the creation of an online bank account. The customer fills out a registration form with personal information and adds a picture of their ID card. The application then validates ID and address, and scans if there is already an existing user by that name. If everything checks out, a backend application will be notified to create the account. Finally, the user is notified of successful completion.

Figure 1. Bank account application workflow

Figure 1. Bank account application workflow

The workflow architecture is shown in Figure 2 (click on the picture to get full resolution).

Figure 2. Initial account creation architecture

Figure 2. Initial account creation architecture

This architecture contains 13 Lambda functions. If you look at the code on GitHub, you can see that:

Five of these Lambda functions are basic and perform simple operations:

Additional Lambda functions perform other tasks, such as verification and validation:

  • One function generates a presigned URL to upload ID card pictures to Amazon Simple Storage Service (Amazon S3)
  • One function uses the Amazon Textract API to extract information from the ID card
  • One function verifies the identity of the user against the information extracted from the ID card
  • One function performs simple HTTP request to a third-party API to validate the address

Finally, four functions concern the websocket (connect, message, and disconnect) and notifications to the user.

Opportunities for improvement

If you further analyze the code of the five basic functions (see startWorkflow on GitHub, for example), you will notice that there are actually three lines of fundamental code that start the workflow. The others 38 lines involve imports, input validation, error handling, logging, and tracing. Remember that all this code must be tested and maintained.

import os
import json
import boto3
from aws_lambda_powertools import Tracer
from aws_lambda_powertools import Logger
import re

logger = Logger()
tracer = Tracer()

sfn = boto3.client('stepfunctions')

PATTERN = re.compile(r"^arn:(aws[a-zA-Z-]*)?:states:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:stateMachine:[a-zA-Z0-9-_]+$")

if ('STATE_MACHINE_ARN' not in os.environ
    or os.environ['STATE_MACHINE_ARN'] is None
    or not PATTERN.match(os.environ['STATE_MACHINE_ARN'])):
    raise RuntimeError('STATE_MACHINE_ARN env var is not set or incorrect')

STATE_MACHINE_ARN = os.environ['STATE_MACHINE_ARN']

@logger.inject_lambda_context
@tracer.capture_lambda_handler
def handler(event, context):
    try:
        event['requestId'] = context.aws_request_id

        sfn.start_execution(
            stateMachineArn=STATE_MACHINE_ARN,
            input=json.dumps(event)
        )

        return {
            'requestId': event['requestId']
        }
    except Exception as error:
        logger.exception(error)
        raise RuntimeError('Internal Error - cannot start the creation workflow') from error

After running this workflow several times and reviewing the AWS X-Ray traces (Figure 3), we can see that it takes about 2–3 seconds when functions are warmed:

Figure 3. X-Ray traces when Lambda functions are warmed

Figure 3. X-Ray traces when Lambda functions are warmed

But the process takes around 10 seconds with cold starts, as shown in Figure 4:

Figure 4. X-Ray traces when Lambda functions are cold

Figure 4. X-Ray traces when Lambda functions are cold

We use an asynchronous architecture to avoid waiting time for the user, as this can be a long process. We also use WebSockets to notify the user when it’s finished. This adds some complexity, new components, and additional costs to the architecture. Now let’s look at how we can optimize this architecture.

Improving the initial architecture

Direct integration with Step Functions

Step Functions can directly integrate with some AWS services, including DynamoDB, Amazon SQS, and EventBridge, and more than 10,000 APIs from 200+ AWS services. With these integrations, you can replace Lambda functions when they do not provide value. We recommend using Lambda functions to transform data, not to transport data from one service to another.

In our bank account creation use case, there are four Lambda functions we can replace with direct service integrations (see large arrows in Figure 5):

  • Query a DynamoDB table to search for a user
  • Send a message to an SQS queue when the extraction fails
  • Create the user in DynamoDB
  • Send an event on EventBridge to notify the backend
Figure 5. Lambda functions that can be replaced

Figure 5. Lambda functions that can be replaced

It is not as clear that we need to replace the other Lambda functions. Here are some considerations:

  • To extract information from the ID card, we use Amazon Textract. It is available through the SDK integration in Step Functions. However, the API’s response provides too much information. We recommend using a library such as amazon-textract-response-parser to parse the result. For this, you’ll need a Lambda function.
  • The identity cross-check performs a simple comparison between the data provided in the web form and the one extracted in the ID card. We can perform this comparison in Step Functions using a Choice state and several conditions. If the business logic becomes more complex, consider using a Lambda function.
  • To validate the address, we query a third-party API. Step Functions cannot directly call a third-party HTTP endpoint, but because it’s integrated with API Gateway, we can create a proxy for this endpoint.

If you only need to retrieve data from an API or make a simple API call, use the direct integration. If you need to implement some logic, use a Lambda function.

Direct integration with API Gateway

API Gateway also provides service integrations. In particular, we can start the workflow without using a Lambda function. In the console, select the integration type “AWS Service”, the AWS service “Step Functions”, the action “StartExecution”, and “POST” method, as shown in Figure 6.

Figure 6. API Gateway direct integration with Step Functions

Figure 6. API Gateway direct integration with Step Functions

After that, use a mapping template in the integration request to define the parameters as shown here:

{
  "stateMachineArn":"arn:aws:states:eu-central-1:123456789012:stateMachine: accountCreationWorkflow",
  "input":"$util.escapeJavaScript($input.json('$'))"
}

We can go further and remove the websockets and associated Lambda functions connect, message, and disconnect. By using Synchronous Express Workflows and the StartSyncExecution API, we can start the workflow and wait for the result in a synchronous fashion. API Gateway will then directly return the result of the workflow to the client.

Final optimized architecture

After applying these optimizations, we have the updated architecture shown in Figure 7. It uses only two Lambda functions out of the initial 13. The rest have been replaced by direct service integrations or implemented in Step Functions.

Figure 7. Final optimized architecture

Figure 7. Final optimized architecture

We were able to remove 11 Lambda functions and their associated fees. In this architecture, the cost is mainly driven by Step Functions, and the main price difference will be your use of Express Workflows instead of Standard Workflows. If you need to keep some Lambda functions, use AWS Lambda Power Tuning to configure your function correctly and benefit from the best price/performance ratio.

One of the main benefits of this architecture is performance. With the final workflow architecture, it now takes about 1.5 seconds when the Lambda function is warmed and 3 seconds on cold starts (versus up to 10 seconds previously), see Figure 8:

Figure 8. X-Ray traces for the final architecture

Figure 8. X-Ray traces for the final architecture

The process can now be synchronous. It reduces the complexity of the architecture and vastly improves the user experience.

An added benefit is that by reducing the overall complexity and removing the unnecessary Lambda functions, we have also reduced the risk of failures. These can be errors in the code, memory or timeout issues due to bad configuration, lack of permissions, network issues between components, and more. This increases the resiliency of the application and eases its maintenance.

Testing

Testability is an important consideration when building your workflow. Unit testing a Lambda function is straightforward, and you can use your preferred testing framework and validate methods. Adopting a hexagonal architecture also helps remove dependencies to the cloud.

When removing functions and using an approach with direct service integrations, you are by definition directly connected to the cloud. You still must verify that the overall process is working as expected, and validate these integrations.

You can achieve this kind of tests locally using Step Functions Local, and the recently announced Mocked Service Integrations. By mocking service integrations, for example, retrieving an item in DynamoDB, you can validate the different paths of your state machine.

You also have to perform integration tests, but this is true whether you use direct integrations or Lambda functions.

Conclusion

This post describes how to simplify your architecture and optimize for performance, resiliency, and cost by using direct integrations in Step Functions and API Gateway. Although many Lambda functions were reduced, some remain useful for handling more complex business logic and data transformation. Try this out now by visiting the GitHub repository.

For further reading:

Orchestrate big data jobs on on-premises clusters with AWS Step Functions

Post Syndicated from Göksel SARIKAYA original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-jobs-on-on-premises-clusters-with-aws-step-functions/

Customers with specific needs to run big data compute jobs on an on-premises infrastructure often require a scalable orchestration solution. For large-scale distributed compute clusters, the orchestration of jobs must be scalable to maximize their utilization, while at the same time remain resilient to any failures to prevent blocking the ever-growing influx of data and jobs. Moreover, on-premises compute resources can’t be extended on demand, therefore, the jobs may be competing for the same resources with different priorities.

This post showcases serverless building blocks for orchestrating big data jobs using AWS Step Functions, AWS Lambda, and Amazon DynamoDB with a focus on reliability, maintainability, and monitoring. In this solution, Step Functions enables thousands of workflows to run parallel. Additionally, Lambda provides flexibility implementing arbitrary interfaces to the on-premises infrastructure and its compute resources. With additional steps in the orchestration, the solution also allows operations to monitor thousands of parallel jobs in a visual interface for better debugging.

Architecture

The proposed serverless solution consists of the following main components:

  • Job trigger – Requests new compute jobs to run on the on-premises cluster. For simplicity, in this architecture we assume that the trigger is a client calling Step Functions directly. However, you could extend this to include Amazon API Gateway to create a job API to interface with the orchestration solution or a rule engine to trigger jobs when relevant data becomes available.
  • Job manager – This Step Functions workflow runs once per compute job, with multiple workflows running in parallel. It tracks the status of a job from queueing, scheduling, running, retrying, all the way to its completion. Ideally, a job can be scheduled immediately, but workflows can run for days if a job is very low priority and compute resources are sparse. The job manager delegates the decision when or where to run the job to the job queue manager. Communication to the on-premises cluster is abstracted through a Lambda adapter.
  • Job queue manager – Maintains a queue of all jobs. With the given job properties (for example based on priority), the job queue manager decides the running time of jobs, and the cluster on which they run. To illustrate the concept, the architecture considers real-time information on the resource utilization of the compute clusters (memory, CPU) for scheduling. However, you could apply different scheduling algorithms as required given the flexibility of Lambda.
  • On-premises compute cluster – Provides the computing resources, data nodes, and tools to run compute jobs.

The following diagram illustrates the solution architecture.

Solution Architecture

The main process of the solution consists of seven steps:

  1. The job trigger runs a new Step Functions workflow to run a compute job on premises and provides the necessary information (such as priority and required resources).
  2. The job manager creates a new record in DynamoDB to add the job to the queue of the job queue manager, and the workflow waits for the job queue manager to call back.
  3. Amazon EventBridge triggers a scheduled Lambda function in the job queue manager periodically (for example, every 5 minutes), decoupled from the job requests.
  4. The job scheduler Lambda function retrieves real-time information from cluster metrics to see whether jobs can be scheduled at this point in time.
  5. The job scheduler function fetches all queued jobs from DynamoDB and tries to schedule as many of those jobs as possible to available compute resources based on priority, as well as memory and CPU demands.
  6. For each job that can be scheduled to the compute cluster, the job scheduler function communicates back to the job manager to signal the workflow to continue and that the job can be run.
  7. The job manager communicates with the on-premises cluster through the compute cluster adapter Lambda function to run the job, track its status periodically, and retry in case of errors.

On-premises compute cluster

In this post, we assume the on-premises compute cluster offers interfaces to interact with the compute resources. For example, customers could run a Spark compute cluster on premises that allows the following basic interactions through an API:

  • Upload and trigger a compute job on a cluster (for example, upload a Spark JAR file and submit)
  • Get the status of a compute job (such as running, stopped, or error)
  • Get error output in case of failures in the compute job (for example, the job failed due to access denied)

In addition, we assume the cluster can provide metrics on its current utilization. For example, the cluster could provide Prometheus metrics as aggregates over all resources within a compute cluster:

  • Memory utilization (for example, 2 TB with 80% utilization)
  • CPU utilization (for example, 5,000 cores with 50% utilization)

We use the terminology introduced here for the example in this post. Depending on the capabilities of the on-premises cluster, you can adjust these concepts. For example, the compute cluster could use Kubernetes or SLURM instead of Spark.

Job manager

The job manager is responsible for communicating with on-premises clusters to trigger big data jobs and query their status. It’s a Step Functions state machine that consists of three steps, as illustrated in the following figure.

The first step is JobQueueRequest, which makes a request to the job queue manager component and waits for the callback. When the job queue manager sends OK to the waiting step with a callback pattern, the second step StartJobRun runs.

The StartJobRun step communicates with the on-premises environment (for example, via HTTP post to a REST API endpoint) to trigger an on-premises job.

The third step GetJobStatus queries the job status from the on-premises cluster. If the job status is InProgress, the state machine waits for a configured time. When the Wait state is over, it returns to the GetJobStatus step to query the job status again in a loop. When the job returns a successful state from the on-premises cluster, the state machine completes its cycle with a Success state. If the job fails with a timeout or with an error, the state machine completes its cycle with a Fail state.

The following screenshot shows the details of the state machine on the Step Functions console.

Jpob Manager Step Function Inputs

Job queue manager

The job queue manager is responsible for managing job queues based on job priorities and cluster utilization. It consists of DynamoDB, Lambda, and EventBridge.

The JobQueue table keeps data of waiting jobs, including jobId as the primary key, priority as the sort key, needed memory and CPU consumptions, callbackId, and timestamp information. You can add further information to the table dynamically if required by the scheduling algorithm.

The following screenshot shows the attribute details of the JobQueue table.

EventBridge triggers the job scheduler Lambda function on a regular bases in a configured interval. First, the job scheduler function gets waiting jobs data from the JobQueue table in DynamoDB. Then it establishes a connection with the on-premises cluster to fetch cluster metrics such as memory and CPU utilization. Based on this information, the function decides which jobs are ready to be triggered on the on-premises cluster.

The scheduling algorithm proposed here follows a simple concept to maximize resource utilization, while respecting the job priority. Essentially, for an on-premises cluster (we could potentially have multiple in different geographies), the job scheduler Lambda function builds a queue of jobs according to their priority, while allocating the first job in the queue to compute resources on the cluster. If enough resources are available, the scheduler moves to the next job in the queue and repeats.

Due to the flexibility of Lambda functions, you can tailor the scheduling algorithm for a specific use case. Cluster scheduling algorithms are still an open research topic with different optimization goals, such as throughput, data location, fairness, deadlines, and more.

Get started

In this section, we provide a starting point for the solution described in this post. The steps walk you through creating a Step Functions state machine with the appropriate template, and the necessary Lambda and DynamoDB interactions to create the job manager and job queue manager building blocks. Example code for the Lambda functions is excluded from this post, because the communication with the on-premises cluster to trigger jobs can vary depending on your on-premises interface.

  1. On the Step Functions console, choose State machines.
  2. Choose Create state machine.
  3. Select Run a sample project.
  4. Select Job Poller.
    Job Poller State Machine Template
  5. Scroll down to see the sample projects, which are defined using Amazon States Language (ASL).
  6. Review the example definition, then choose Next.Job Manager Step Functions Template
  7. Choose Deploy resources.
    Deployment can take up to 10 minutes.Step Functions Deploy Resources
    The deployment creates the state machine that is responsible for job management. After you deploy the resources, you need to edit the sample ASL code to add the extra JobQueueRequest step in the state machine.
  8. Select the created state machine.
  9. Choose Edit to add ARNs of the three Lambda functions to make a request in the job queue manager (Job Queue Request), to submit a job to the on-premises cluster (Submit Job), and to poll the status of the jobs (Get Job Status).Job Manager Step Functions Definition
    Now you’re ready to create the job queue manager.
  10. On the DynamoDB console, create a table for storing job metadata.
  11. On the EventBridge console, create a scheduled rule that triggers the Lambda function at a configured interval.
  12. On the Lambda console, create the function that communicates with the on-premises cluster to fetch cluster metrics. It also gets jobs from the DynamoDB table to retrieve information including job priorities, required memory, and CPU to run the job on the on-premises cluster.

Limitations

This solution uses Step Functions to track all jobs until completion, and therefore the Step Functions quotas must be considered for potential use cases. Mainly, a workflow can run for a maximum of 1 year (cannot be increased) and by default 1 million parallel runs can run in a single account (can be increased to millions). See Quotas for further details.

Conclusion

This post described how to orchestrate big data jobs running in parallel on on-premises clusters with a Step Functions workflow. To learn more about how to use Step Functions workflows for serverless orchestration, visit Serverless Land.


About the Authors

Göksel Sarikaya is a Senior Cloud Application Architect at AWS Professional Services. He enables customers to design scalable, high-performance, and cost effective applications using the AWS Cloud. He helps them to be more flexible and competitive during their digital transformation journey.

Nicolas Jacob Baer is a Senior Cloud Application Architect with a strong focus on data engineering and machine learning, based in Switzerland. He works closely with enterprise customers to design data platforms and build advanced analytics/ml use-cases.

Shukhrat Khodjaev is a Senior Engagement Manager at AWS ProServe, based out of Berlin. He focuses on delivering engagements in the field of Big Data and AI/ML that enable AWS customers to uncover and to maximize their value through efficient use of data.

Simplify your ETL and ML pipelines using the Amazon Athena UNLOAD feature

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/simplify-your-etl-and-ml-pipelines-using-the-amazon-athena-unload-feature/

Many organizations prefer SQL for data preparation because they already have developers for extract, transform, and load (ETL) jobs and analysts preparing data for machine learning (ML) who understand and write SQL queries. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

By default, Athena automatically writes SELECT query results in CSV format to Amazon S3. However, you might often have to write SELECT query results in non-CSV files such as JSON, Parquet, and ORC for various use cases. In this post, we walk you through the UNLOAD statement in Athena and how it helps you implement several use cases, along with code snippets that you can use.

Athena UNLOAD overview

CSV is the only output format used by the Athena SELECT query, but you can use UNLOAD to write the output of a SELECT query to the formats and compression that UNLOAD supports. When you use UNLOAD in a SELECT query statement, it writes the results into Amazon S3 in specified data formats of Apache Parquet, ORC, Apache Avro, TEXTFILE, and JSON.

Although you can use the CTAS statement to output data in formats other than CSV, those statements also require the creation of a table in Athena. The UNLOAD statement is useful when you want to output the results of a SELECT query in a non-CSV format but don’t require the associated table. For example, a downstream application might require the results of a SELECT query to be in JSON format, and Parquet or ORC might provide a performance advantage over CSV if you intend to use the results of the SELECT query for additional analysis.

In this post, we walk you through the following use cases for the UNLOAD feature:

  • Compress Athena query results to reduce storage costs and speed up performance for downstream consumers
  • Store query results in JSON file format for specific downstream consumers
  • Feed downstream Amazon SageMaker ML models that require files as input
  • Simplify ETL pipelines with AWS Step Functions without creating a table

Use case 1: Compress Athena query results

When you’re using Athena to process and create large volumes of data, storage costs can increase significantly if you don’t compress the data. Furthermore, uncompressed formats like CSV and JSON require you to store and transfer a large number of files across the network, which can increase IOPS and network costs. To reduce costs and improve downstream big data processing application performance such as Spark applications, a best practice is to store Athena output into compressed columnar compressed file formats such as ORC and Parquet.

You can use the UNLOAD statement in your Athena SQL statement to create compressed ORC and Parquet file formats. In this example, we use a 3 TB TPC-DS dataset to find all items returned between a store and a website. The following query joins the four tables: item, store_returns, web_returns, and customer_address:

UNLOAD (
		select *
		from store_returns, item, web_returns, customer_address
		where sr_item_sk = i_item_sk and
		wr_item_sk = i_item_sk and
		wr_refunded_addr_sk = ca_address_sk
	) to 's3://your-bucket/temp/athenaunload/usecase1/' with (
		format = 'PARQUET',
		compression = 'SNAPPY',
		partitioned_by = ARRAY['ca_location_type']
		
	)

The resulting query output when Snappy compressed and stored in Parquet format resulted in a 62 GB dataset. The same output in a non-compressed CSV format resulted in a 248 GB dataset. The Snappy compressed Parquet format yielded a 75% smaller storage size, thereby saving storage costs and resulting in faster performance.

Use case 2: Store query results in JSON file format

Some downstream systems to Athena such as web applications or third-party systems require the data formats to be in JSON format. The JSON file format is a text-based, self-describing representation of structured data that is based on key-value pairs. It’s lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies. In these use cases, the UNLOAD statement with the parameter format value of JSON can unload files in JSON file format to Amazon S3.

The following SQL extracts the returns data for a specific customer within a specific data range against the 3 TB catalog_returns table and stores it to Amazon S3 in JSON format:

UNLOAD (
		select cr_returned_date_sk, cr_returning_customer_sk, cr_catalog_page_sk, cr_net_loss
		from catalog_returns
		where cr_returned_date_sk = 2450821 and cr_returning_customer_sk = 11026691
	) to 's3://your-bucket/temp/athenaunload/usecase2/' with (
		format = 'JSON', compression = 'NONE'
	)

By default, Athena uses Gzip for JSON and TEXTFILE formats. You can set the compression to NONE to store the UNLOAD result without any compression. The query result is stored as the following JSON file:

{"cr_returned_date_sk":2450821,"cr_returning_customer_sk":11026691,"cr_catalog_page_sk":20.8,"cr_net_loss":53.31}

The query result can now be consumed by a downstream web application.

Use case 3: Feed downstream ML models

Analysts and data scientists rely on Athena for ad hoc SQL queries, data discovery, and analysis. They often like to quickly create derived columns such as aggregates or other features. These need to be written as files in Amazon S3 so a downstream ML model can directly read the files without having to rely on a table.

You can also parametrize queries using Athena prepared statements that are repetitive. Using the UNLOAD statement in a prepared statement provides the self-service capability to less technical users or analysts and data scientists to export files needed for their downstream analysis without having to write queries.

In the following example, we create derived columns and feature engineer for a downstream SageMaker ML model that predicts the best discount for catalog items in future promotions. We derive averages for quantity, list price, discount, and sales price for promotional items sold in stores where the promotion is not offered by mail or a special event. Then we restrict the results to a specific gender, marital, and educational status. We use the following query:

UNLOAD(
		Select i_item_id, 
	        avg(ss_quantity) avg_sales,
	        avg(ss_list_price) avg_list_price,
	        avg(ss_coupon_amt) avg_coupon_amt,
	        avg(ss_sales_price) avg_sales_price 
	 from store_sales, customer_demographics, date_dim, item, promotion
	 where cast(ss_sold_date_sk AS int) = d_date_sk and
	       ss_item_sk = i_item_sk and
	       ss_cdemo_sk = cd_demo_sk and
	       ss_promo_sk = p_promo_sk and
	       cd_gender = 'M' and 
	       cd_marital_status = 'M' and
	       cd_education_status = '4 yr Degree' and
	       (p_channel_email = 'N' or p_channel_event = 'N') and
	       d_year = 2001 
	 group by i_item_id
	 order by i_item_id
	) to 's3://your-bucket/temp/athenaunload/usecase3/' with (
		format = 'PARQUET',compression = 'SNAPPY'
	)

The output is written as Parquet files in Amazon S3 for a downstream SageMaker model training job to consume.

Use case 4: Simplify ETL pipelines with Step Functions

Step Functions is integrated with the Athena console to facilitate building workflows that include Athena queries and data processing operations. This helps you create repeatable and scalable data processing pipelines as part of a larger business application and visualize the workflows on the Athena console.

In this use case, we provide an example query result in Parquet format for downstream consumption. In this example, the raw data is in TSV format and gets ingested on a daily basis. We use the Athena UNLOAD statement to convert the data into Parquet format. After that, we send the location of the Parquet file as an Amazon Simple Notification Service (Amazon SNS) notification. Downstream applications can be notified via SNS to take further actions. One common example is to initiate a Lambda function that uploads the Athena transformation result into Amazon Redshift.

The following diagram illustrates the ETL workflow.

The workflow includes the following steps:

  1. Start an AWS Glue crawler pointing to the raw S3 bucket. The crawler updates the metadata of the raw table with new files and partitions.
  2. Invoke a Lambda function to clean up the previous UNLOAD result. This step is required because UNLOAD doesn’t write data to the specified location if the location already has data in it (UNLOAD doesn’t overwrite existing data). To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again. Another common pattern is to UNLOAD data to a new partition with incremental data processing.
  3. Start an Athena UNLOAD query to convert the raw data into Parquet.
  4. Send a notification to downstream data consumers when the file is updated.

Set up resources with AWS CloudFormation

To prepare for querying both data sources, launch the provided AWS CloudFormation template:

Keep all the provided parameters and choose Create stack.

The CloudFormation template creates the following resources:

  • An Athena workgroup etl-workgroup, which holds the Athena UNLOAD queries.
  • A data lake S3 bucket that holds the raw table. We use the Amazon Customer Reviews Dataset in this post.
  • An Athena output S3 bucket that holds the UNLOAD result and query metadata.
  • An AWS Glue database.
  • An AWS Glue crawler pointing to the data lake S3 bucket.
  • A LoadDataBucket Lambda function to load the Amazon Customer Reviews raw data into the S3 bucket.
  • A CleanUpS3Folder Lambda function to clean up previous Athena UNLOAD result.
  • An SNS topic to notify downstream systems when the UNLOAD is complete.

When the stack is fully deployed, navigate to the Outputs tab of the stack on the AWS CloudFormation console and note the value of the following resources:

  • AthenaWorkgroup
  • AthenaOutputBucket
  • CleanUpS3FolderLambda
  • GlueCrawler
  • SNSTopic

Build a Step Functions workflow

We use the Athena Workflows feature to build the ETL pipeline.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under Create Athena jobs with Step Functions workflows, for Query large datasets, choose Get started.
  3. Choose Create your own workflow.
  4. Choose Continue.

The following is a screenshot of the default workflow. Compare the default workflow against the earlier ETL workflow we described. The default workflow doesn’t contain a Lambda function invocation and has an additional GetQueryResult step.

Next, we add a Lambda Invoke step.

  1. Search for Lambda Invoke in the search bar.
  2. Choose the Lambda:Invoke step and drag it to above the Athena: StartQueryExecution step.
  3. Choose the Athena:GetQueryResults step (right-click) and choose Delete state.

  4. Now the workflow aligns with the earlier design.
  5. Choose the step Glue: StartCrawler.
  6. In the Configuration section, under API Parameters, enter the following JSON (provide the AWS Glue crawler name from the CloudFormation stack output):
    {
      "Name": "GlueCrawler"
    }

  7. Choose the step Glue: GetCrawler.
  8. In the Configuration section, under API Parameters, enter the following JSON:
    {
      "Name": "GlueCrawler"
    }

  9. Choose the step Lambda: Invoke.
  10. In the Configuration section, under API Parameters, for Function name, choose the function -CleanUpS3FolderLambda-.
  11. In the Payload section, enter the following JSON (include the Athena output bucket from the stack output):
    {
      "bucket_name": “AthenaOutputBucket”,
      "prefix": "parquet/"
    }

  12. Choose the step Athena: StartQueryExecution.
  13. In the right Configuration section, under API Parameters, enter the following JSON (provide the Athena output bucket and workgroup name):
    {
      "QueryString": "UNLOAD (SELECT * FROM \"athena_unload_blog\".\"reviews\" )  TO 's3://AthenaOutputBucket/parquet' WITH (format = 'PARQUET',compression = 'SNAPPY')",
      "WorkGroup": “AthenaWorkgroup”
    }

Notice the Wait for task to complete check box is selected. This pauses the workflow while the Athena query is running.

  1. Choose the step SNS: Publish.
  2. In the Configuration section, under API Parameters, for Topic, pick the SNSTopic created by the CloudFormation template.
  3. In the Message section, enter the following JSON to pass the data manifest file location to the downstream consumer:
    {
      "Input.$": "$.QueryExecution.Statistics.DataManifestLocation"
    }

For more information, refer to the GetQueryExecution response syntax.

  1. Choose Next.
  2. Review the generated code and choose Next.
  3. In the Permissions section, choose Create new role.
  4. Review the auto-generated permissions and choose Create state machine.
  5. In the Add additional permissions to your new execution role section, choose Edit role in IAM.
  6. Add permissions and choose Attach policies.
  7. Search for the AWSGlueConsoleFullAccess managed policy and attach it.

This policy grants full access to AWS Glue resources when using the AWS Management console. Generate a policy based on access activity in production following the least privilege principle.

Test the workflow

Next, we test out the Step Functions workflow.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under State machines, choose the workflow we just created.
  3. Choose Execute, then choose Start execution to start the workflow.
  4. Wait until the workflow completes, then verify there are UNLOAD Parquet files in the bucket AthenaOutputBucket.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post.

  1. On the Amazon S3 console, choose the -athena-unload-data-lake bucket.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. Repeat these steps to remove all files and folders in the -athena-unload-output bucket.
  5. On the AWS CloudFormation console, delete the stack you created.
  6. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

In this post, we introduced the UNLOAD statement in Athena with some common use cases. We demonstrated how to compress Athena query results to reduce storage costs and improve performance, store query results in JSON file format, feed downstream ML models, and create and visualize ETL pipelines with Step Functions without creating a table.

To learn more, refer to the Athena UNLOAD documentation and Visualizing AWS Step Functions workflows from the Amazon Athena console.


About the Authors

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Harsha Tadiparthi is a Principal Solutions Architect focused on providing analytics and AI/ML strategies and solution designs to customers.

Orchestrating high performance computing with AWS Step Functions and AWS Batch

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/orchestrating-high-performance-computing-with-aws-step-functions-and-aws-batch/

This post is written by Dan Fox, Principal Specialist Solutions Architect; Sabha Parameswaran, Senior Solutions Architect.

High performance computing (HPC) workloads address challenges in a wide variety of industries, including genomics, financial services, oil and gas, weather modeling, and semiconductor design. These workloads frequently have complex orchestration requirements that may include large datasets and hundreds or thousands of compute tasks.

AWS Step Functions is a workflow automation service that can simplify orchestrating other AWS services. AWS Batch is a fully managed batch processing service that can dynamically scale to address computationally intensive workloads. Together, these services can orchestrate and run demanding HPC workloads.

This blog post identifies three common challenges when creating HPC workloads. It describes some features with Step Functions and AWS Batch that can help to solve these challenges. It then shows a sample project that performs complex task orchestration using Step Functions and AWS Batch.

Reaching service quotas with polling iterations

It’s common for HPC workflows to require that a step comprising multiple jobs completes before advancing to the next step. In these cases, it’s typical for developers to implement iterative polling patterns to track job completions.

To handle task orchestration for a workload like this, you may choose to use Step Functions. Step Functions has a hard limit of 25,000 history events. A single state transition may contain multiple history events. For example, an entry to the state and an exit from the state count as two steps. A workflow that iteratively polls many long-running processes may run into limits with this service quota.

Step Functions addresses this by providing synchronization capabilities with several integrated services, including AWS Batch. For integrated services, Step Functions can wait for a task to complete before progressing to the next state. This feature, called “Run a Job (.sync)” in the documentation, returns a Success or Failure message only when the compute service task is complete, reducing the number of entries in the event history log. View the Step Functions documentation for a complete list of supported service integrations.

“Run a Job (.sync)” task pattern

Supporting parallel and dynamic tasks

HPC workloads may require a changing number of compute tasks from execution to execution. For example, the number of compute tasks required in a workflow may depend on the complexity or length of an input dataset. For performance reasons, you may desire for these tasks to run in parallel.

Step Functions supports faster data processing with a fixed number of parallel executions through the parallel state type. If a workload has an unknown number of branches, the map state type can run a set of parallel steps for each element of an input array. We refer to this pattern as dynamic parallelism.

Dynamic parallelism using the map state

Limits and flow control with dynamic tasks

The map state may limit concurrent iterations. When this occurs, some iterations do not begin until previous iterations have completed. The likelihood of this occurring increases when an input array has over 40 items. If your HPC workload benefits from increased concurrency, you may use nested workflows. Step Functions allows you to orchestrate more complex processes by composing modular, reusable workflows.

For example, a map state may invoke secondary, nested workflows, which also contain map states. By nesting Step Functions workflows, you can build larger, more complex workflows out of smaller, simpler workflows.

As your workflow grows in complexity, you may use the callback task, which is an additional flow control feature. Callback tasks provide a way to pause a workflow pending the return of a unique task token. A callback task passes this token to an integrated service and then stops. Once the integrated service has completed, it may return the task token to the Step Functions workflow with a SendTaskSuccess or SendTaskFailure call. Once the callback task receives the task token, the workflow proceeds to the next state.

View the documentation for a list of integrated services that support this pattern.

Nested workflows using callback with task token

A sample project with several orchestration patterns

This sample project demonstrates orchestration of HPC workloads using Step Functions, AWS Batch, and Lambda. The nesting in this project is three layers deep. The primary state machine runs the second layer nested state machines, which in turn run the third layer nested state machines.

The primary state machine demonstrates dynamic parallelism. It receives an input payload that contains an array of items used as input for its map step. Each dynamic map branch runs a nested secondary state machine.

The secondary state machine demonstrates several workflow patterns including a Lambda function with callback, a third layer nested state machine in sync mode, an AWS Batch job in sync mode, and a Lambda function calling AWS Batch with a callback. The tertiary state machine only notifies its parent with Success when called in sync mode.

Explore the ASL in this project to review the code for these patterns.

Sample project architecture

Deploy the sample application

The README of the Github project contains more detailed instructions, but you may also follow these steps.

Prerequisites

  1. AWS Account: If you don’t have an AWS account, navigate to https://aws.amazon.com/ and choose Create an AWS Account.
  2. VPC: A valid existing VPC with subnets (for execution of AWS Batch jobs). Refer to https://docs.aws.amazon.com/vpc/latest/userguide/vpc-getting-started.html for creating a VPC.
  3. AWS CLI: This project requires a local install of AWS CLI. Refer to https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html for installing AWS CLI.
  4. Git CLI: This project requires a local install of Git CLI. Refer to https://git-scm.com/book/en/v2/Getting-Started-Installing-Git
  5. AWS SAM CLI: This project requires a local install of AWS SAM CLI to build and deploy the sample application. Refer to https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html for instructions to install and use AWS SAM CLI.
  6. Python 3.8 and Docker: Required only if you plan for local development and testing with AWS SAM CLI.

Build and deploy in your account

Follow these steps to build this application locally and deploy it in your AWS account:

  1. Git clone the repository into a local folder.
    git clone https://github.com/aws-samples/aws-stepfunction-complex-orchestrator-app
    cd aws-stepfunction-complex-orchestrator-app
    
  2. Build the application using AWS SAM CLI.
    sam build
  3. Use AWS SAM deploy with the --guided flag.
    sam deploy --guided
    1. Parameter BatchScriptName (accept default: batch-notify-step-function.sh)
    2. Parameter VPCID (enter the id of your VPC)
    3. Parameter FargateSubnetAccessibility (choose Private or Public depending on subnet configuration)
    4. Parameter Subnets (enter ID for a single subnet)
    5. Parameter batchSleepDuration (accept default: 15)
    6. Accept all other defaults
  4. Note the name of the BucketName parameter in the Outputs of the deploy process. Save this S3 bucket name for the next step.
  5. Copy the batch script into the S3 bucket created in the prior step.
    aws s3 cp ./batch-script/batch-notify-step-function.sh s3://<S3-bucket-name>

Testing the sample application

Once the SAM CLI deploy is complete, navigate to Step Functions in the AWS Console.

  1. Note the three new state machines deployed to your account. Each state machine has a random suffix generated as part of the AWS SAM deploy process:
    1. ComplexOrchestratorStateMachine1-…
    2. SyncNestedStateMachine2-…
    3. CallbackNotifyStateMachine3-…
  2. Follow the link for the primary state machine: ComplexOrchestratorStateMachine1-…
  3. Choose Start execution.
  4. The sample payload for this state machine is located here: orchestrator-step-function-payload.json. This JSON document has 2 input elements within the entries array. Select and copy this JSON and paste it into the Input field in the console, overwriting the default value. This causes the map state to create and execute two nested state machines in parallel. You may modify this JSON to contain more elements to increase the number of parallel executions.
  5. View the “Execution event history” in the console for this execution. Under the Resource column, locate the links to the nested state machines. Select these links to follow their individual executions. You may also explore the links to Lambda, CloudWatch, and AWS Batch job.
  6. Navigate to AWS Batch in the console. Step Functions workflows are available to AWS Batch users within the AWS Batch console. Find the integration from the AWS Batch side navigation under “Related AWS services”. You can also read this blog post to learn more.
  7. Common troubleshooting. If the batch job fails or if the Step Functions workflow times out, make sure that you have correctly copied over the batch script into the S3 bucket as described in step 5 of the Build and Deploy in your account section of this post. Also make sure that the FargateSubnetAccessibility Parameter matches the configuration of your subnet (Public or Private).
  8. The state machine may take several minutes to complete. When successful, the Graph Inspector displays:Graph Inspector with Successful completion

Cleaning up

To clean up the deployment, run the following commands to delete the stack associated with the AWS SAM deployment:

aws cloudformation delete-stack --stack-name <stack-name>

Conclusion

This blog post describes several challenges common to orchestrating HPC workloads. I describe how Step Functions with AWS Batch can solve many of these challenges. I provide a project that contains several sample patterns and show how to deploy and test this in your account.

For more information on serverless patterns, visit Serverless Land.

ICYMI: Serverless Q1 2022

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q1-2022/

Welcome to the 16th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Calendar

In case you missed our last ICYMI, check out what happened last quarter here.

AWS Lambda

Lambda now offers larger ephemeral storage for functions, up to 10 GB. Previously, the storage was set to 512 MB. There are several common use-cases that can benefit from expanded temporary storage, including extract-transform load (ETL) jobs, machine learning inference, and data processing workloads. To see how to configure the amount of /tmp storage in AWS SAM, deploy this Serverless Land Pattern.

Ephemeral storage settings

For Node.js developers, Lambda now supports ES Modules and top-level await for Node.js 14. This enables developers to use a wider range of JavaScript packages in functions. With top-level await, when used with Provisioned Concurrency, this can improve cold-start performance when using asynchronous initialization.

For .NET developers, Lambda now supports .NET 6 as both a managed runtime and container base image. You can now use new features of the runtime such as improved logging, simplified function definitions using top-level statements, and improved performance using source generators.

The Lambda console now allows you to share test events with other developers in your team, using granular IAM permissions. Previously, test events were only visible to the builder who created them. To learn about creating sharable test events, read this documentation.

Amazon EventBridge

Amazon EventBridge Schema Registry helps you create code bindings from event schemas for use directly in your preferred IDE. You can generate these code bindings for a schema by using the EventBridge console, APIs, or AWS SDK toolkits for Jetbrains (Intellij, PyCharm, Webstorm, Rider) and VS Code. This feature now supports Go, in addition to Java, Python, and TypeScript, and is available at no additional cost.

AWS Step Functions

Developers can test state machines locally using Step Functions Local, and the service recently announced mocked service integrations for local testing. This allows you to define sample output from AWS service integrations and combine them into test cases to validate workflow control. This new feature introduces a robust way to state machines in isolation.

Amazon DynamoDB

Amazon DynamoDB now supports limiting the number of items processed in PartiQL operation, using an optional parameter on each request. The service also increased default Service Quotas, which can help simplify the use of large numbers of tables. The per-account, per-Region quota increased from 256 to 2,500 tables.

AWS AppSync

AWS AppSync added support for custom response headers, allowing you to define additional headers to send to clients in response to an API call. You can now use the new resolver utility $util.http.addResponseHeaders() to configure additional headers in the response for a GraphQL API operation.

Serverless blog posts

January

Jan 6 – Using Node.js ES modules and top-level await in AWS Lambda

Jan 6 – Validating addresses with AWS Lambda and the Amazon Location Service

Jan 20 – Introducing AWS Lambda batching controls for message broker services

Jan 24 – Migrating AWS Lambda functions to Arm-based AWS Graviton2 processors

Jan 31 – Using the circuit breaker pattern with AWS Step Functions and Amazon DynamoDB

Jan 31 – Mocking service integrations with AWS Step Functions Local

February

Feb 8 – Capturing client events using Amazon API Gateway and Amazon EventBridge

Feb 10 – Introducing AWS Virtual Waiting Room

Feb 14 – Building custom connectors using the Amazon AppFlow Custom Connector SDK

Feb 22 – Building TypeScript projects with AWS SAM CLI

Feb 24 – Introducing the .NET 6 runtime for AWS Lambda

March

Mar 6 – Migrating a monolithic .NET REST API to AWS Lambda

Mar 7 – Decoding protobuf messages using AWS Lambda

Mar 8 – Building a serverless image catalog with AWS Step Functions Workflow Studio

Mar 9 – Composing AWS Step Functions to abstract polling of asynchronous services

Mar 10 – Building serverless multi-Region WebSocket APIs

Mar 15 – Using organization IDs as principals in Lambda resource policies

Mar 16 – Implementing mutual TLS for Java-based AWS Lambda functions

Mar 21 – Running cross-account workflows with AWS Step Functions and Amazon API Gateway

Mar 22 – Sending events to Amazon EventBridge from AWS Organizations accounts

Mar 23 – Choosing the right solution for AWS Lambda external parameters

Mar 28 – Using larger ephemeral storage for AWS Lambda

Mar 29 – Using AWS Step Functions and Amazon DynamoDB for business rules orchestration

Mar 31 – Optimizing AWS Lambda function performance for Java

First anniversary of Serverless Land Patterns

Serverless Patterns Collection

The DA team launched the Serverless Patterns Collection in March 2021 as a repository of serverless examples that demonstrate integrating two or more AWS services. Each pattern uses an infrastructure as code (IaC) framework to automate the deployment. These can simplify the creation and configuration of the services used in your applications.

The Serverless Patterns Collection is both an educational resource to help developers understand how to join different services, and an aid for developers that are getting started with building serverless applications.

The collection has just celebrated its first anniversary. It now contains 239 patterns for CDK, AWS SAM, Serverless Framework, and Terraform, covering 30 AWS services. We have expanded example runtimes to include .NET, Java, Rust, Python, Node.js and TypeScript. We’ve served tens of thousands of developers in the first year and we’re just getting started.

Many thanks to our contributors and community. You can also contribute your own patterns.

Videos

YouTube: youtube.com/serverlessland

Serverless Office Hours – Tues 10 AM PT

Weekly live virtual office hours. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

YouTube: youtube.com/serverlessland
Twitch: twitch.tv/aws

January

February

March

FooBar Serverless YouTube channel

The Developer Advocate team is delighted to welcome Marcia Villalba onboard. Marcia was an AWS Serverless Hero before joining AWS over two years ago, and she has created one of the most popular serverless YouTube channels. You can view all of Marcia’s videos at https://www.youtube.com/c/FooBar_codes.

January

February

March

AWS Summits

AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. This year, we have restarted in-person Summits at major cities around the world.

The next 4 Summits planned are Paris (April 12), San Francisco (April 20-21), London (April 27), and Madrid (May 4-5). To find and register for your nearest AWS Summit, visit the AWS Summits homepage.

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Using AWS Step Functions and Amazon DynamoDB for business rules orchestration

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-aws-step-functions-and-amazon-dynamodb-for-business-rules-orchestration/

This post is written by Vijaykumar Pannirselvam, Cloud Consultant, Sushant Patil, Cloud Consultant, and Kishore Dhamodaran, Senior Solution Architect.

A Business Rules Engine (BRE) is used in enterprises to manage business-critical decisions. The logic or rules used to make such decisions can vary in complexity. A finance department may have a basic rule to get any purchase over a certain dollar amount to get director approval. A mortgage company may need to run complex rules based on inputs (for example, credit score, debt-to-income ratio, down payment) to make an approval decision for a loan.

Decoupling these rules from application logic provides agility to your rules management, since business rules may often change while your application may not. It can also provide standardization across your enterprise, so every department can communicate with the same taxonomy.

As part of migrating their workloads, some enterprises consider replacing their commercial rules engine with cloud native and open-source alternatives. The motivation for such a move stem from several factors, such as simplifying the architecture, cost, security considerations, or vendor support.

Many of these commercial rules engines come as part of a BPMS offering that provides orchestration capabilities for rules execution. For a successful migration to cloud using an open-source rules engine management system, you need an orchestration capability to manage incoming rule requests, auditing the rules, and tracking exceptions.

This post showcases an orchestration framework that allows you to use an open-source rules engine. It uses Drools rules engine to build a set of rules for calculating insurance premiums based on the properties of Car and Person objects. This uses AWS Step Functions, AWS Lambda, Amazon API Gateway, Amazon DynamoDB, and open-source Drools rules engine to show this. You can swap the rules engine provided you can manage it in the AWS Cloud environment and expose it as an API.

Solution overview

The following diagram shows the solution architecture.

Solution architecture

The solution comprises:

  1. API Gateway – a fully managed service that makes it easier to create, publish, maintain, monitor, and secure APIs at any scale for API consumers. API Gateway helps you manage traffic to backend systems, in this case Step Functions, which orchestrates the execution of tasks. For the REST API use-case, you can also set up a cache with customizable keys and time-to-live in seconds for your API data to avoid hitting your backend services for each request.
  2. Step Functions – a low code service to orchestrate multiple steps involved to accomplish tasks. Step Functions uses the finite-state machine (FSM) model, which uses given states and transitions to complete the tasks. The diagram depicts three states: Audit Request, Execute Ruleset and Audit Response. We execute them sequentially. You can add additional states and transitions, such as validating incoming payloads, and branching out parallel execution of the states.
  3. Drools rules engine Spring Boot application – runtime component of the rule execution. You set the Drools rule engine Spring Boot application as an Apache Maven Docker project with Drools Maven dependencies. You then deploy the Drools rule engine Docker image to an Amazon Elastic Container Registry (Amazon ECR), create an AWS Fargate cluster, and an Amazon Elastic Container Service (Amazon ECS) service. The service launches Amazon ECS tasks and maintains the desired count. An Application Load Balancer distributes the traffic evenly to all running containers.
  4. Lambda – a serverless execution environment giving you an ability to interact with the Drools Engine and a persistence layer for rule execution audit functions. The Lambda component provides the audit function required to persist the incoming requests and outgoing responses in DynamoDB. Apart from the audit function, Lambda is also used to invoke the service exposed by the Drools Spring Boot application.
  5. DynamoDB – a fully managed and highly scalable key/value store, to persist the rule execution information, such as request and response payload information. DynamoDB provides the persistence layer for the incoming request JSON payload and for the outgoing response JSON payload. The audit Lambda function invokes the DynamoDB put_item() method when it receives the request or response event from Step Functions. The DynamoDB table rule_execution_audit has an entry for every request and response associated with the incoming request-id originated by the application (upstream).

Drools rules engine implementation

The Drools rules engine separates the business rules from the business processes. You use DRL (Drools Rule Language) by defining business rules as .drl text files. You define model objects to build the rules.

The model objects are POJO (Plain Old Java Objects) defined using Eclipse, with the Drools plugin installed. You should have some level of knowledge about building rules and executing them using the Drools rules engine. The below diagram describes the functions of this component.

Drools process

You define the following rules in the .drl file as part of the GitHub repo. The purpose of these rules is to evaluate the driver premium based on the input model objects provided as input. The inputs are Car and Driver objects and output is the Policy object, which has the premium calculated based on the certain criteria defined in the rule:

rule "High Risk"
     when     
         $car : Car(style == "SPORTS", color == "RED") 
         $policy : Policy() 
         and $driver : Driver ( age < 21 )                             
     then
         System.out.println(drools.getRule().getName() +": rule fired");          
         modify ($policy) { setPremium(increasePremiumRate($policy, 20)) };
 end
 
 rule "Med Risk"
     when     
         $car : Car(style == "SPORTS", color == "RED") 
         $policy : Policy() 
         and $driver : Driver ( age > 21 )                             
     then
         System.out.println(drools.getRule().getName() +": rule fired");          
         modify ($policy) { setPremium(increasePremiumRate($policy, 10)) };
 end
 
 
 function double increasePremiumRate(Policy pol, double percentage) {
     return (pol.getPremium() + pol.getPremium() * percentage / 100);
 }
 

Once the rules are defined, you define a RestController that takes input parameters and evaluates the above rules. The below code snippet is a POST method defined in the controller, which handles the requests and sends the response to the caller.

@PostMapping(value ="/policy/premium", consumes = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE }, produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<Policy> getPremium(@RequestBody InsuranceRequest requestObj) {
        
        System.out.println("handling request...");
        
        Car carObj = requestObj.getCar();        
        Car carObj1 = new Car(carObj.getMake(),carObj.getModel(),carObj.getYear(), carObj.getStyle(), carObj.getColor());
        System.out.println("###########CAR##########");
        System.out.println(carObj1.toString());
        
        System.out.println("###########POLICY##########");        
        Policy policyObj = requestObj.getPolicy();
        Policy policyObj1 = new Policy(policyObj.getId(), policyObj.getPremium());
        System.out.println(policyObj1.toString());
            
        System.out.println("###########DRIVER##########");    
        Driver driverObj = requestObj.getDriver();
        Driver driverObj1 = new Driver( driverObj.getAge(), driverObj.getName());
        System.out.println(driverObj1.toString());
        
        KieSession kieSession = kieContainer.newKieSession();
        kieSession.insert(carObj1);      
        kieSession.insert(policyObj1); 
        kieSession.insert(driverObj1);         
        kieSession.fireAllRules(); 
        printFactsMessage(kieSession);
        kieSession.dispose();
    
        
        return ResponseEntity.ok(policyObj1);
    }    

Prerequisites

Solution walkthrough

  1. Clone the project GitHub repository to your local machine, do a Maven build, and create a Docker image. The project contains Drools related folders needed to build the Java application.
    git clone https://github.com/aws-samples/aws-step-functions-business-rules-orchestration
    cd drools-spring-boot
    mvn clean install
    mvn docker:build
    
  2. Create an Amazon ECR private repository to host your Docker image.
    aws ecr create-repository —repository-name drools_private_repo —image-tag-mutability MUTABLE —image-scanning-configuration scanOnPush=false
  3. Tag the Docker image and push it to the Amazon ECR repository.
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <<INSERT ACCOUNT NUMBER>>.dkr.ecr.us-east-1.amazonaws.com
    docker tag drools-rule-app:latest <<INSERT ACCOUNT NUMBER>>.dkr.ecr.us-east-1.amazonaws.com/drools_private_repo:latest
    docker push <<INSERT ACCOUNT NUMBER>>.dkr.ecr.us-east-1.amazonaws.com/drools_private_repo:latest
    
  4. Deploy resources using AWS SAM:
    cd ..
    sam build
    sam deploy --guided

    SAM deployment output

Verifying the deployment

Verify the business rules execution and the orchestration components:

  1. Navigate to the API Gateway console, and choose the rules-stack API.
    API Gateway console
  2. Under Resources, choose POST, followed by TEST.
    Resource configuration
  3. Enter the following JSON under the Request Body section, and choose Test.

    {
      "context": {
        "request_id": "REQ-99999",
        "timestamp": "2021-03-17 03:31:51:40"
      },
      "request": {
        "driver": {
          "age": "18",
          "name": "Brian"
        },
        "car": {
          "make": "honda",
          "model": "civic",
          "year": "2015",
          "style": "SPORTS",
          "color": "RED"
        },
        "policy": {
          "id": "1231231",
          "premium": "300"
        }
      }
    }
    
  4. The response received shows results from the evaluation of the business rule “High Risk“, with the premium representing the percentage calculation in the rule definition. Try changing the request input to evaluate a “Medium Risk” rule by modifying the age of the driver to 22 or higher:
    Sample response
  5. Optionally, you can verify the API using Postman. Get the endpoint information by navigating to the rule-stack API, followed by Stages in the navigation pane, then choosing either Dev or Stage.
  6. Enter the payload in the request body and choose Send:
    Postman UI
  7. The response received is results from the evaluation of business rule “High Risk“, with the premium representing the percentage calculation in the rule definition. Try changing the request input to evaluate a “Medium Risk” rule by modifying the age of the driver to 22 or higher.
    Body JSON
  8. Observe the request and response audit logs. Navigate to the DynamoDB console. Under the navigation pane, choose Tables, then choose rule_execution_audit.
    DynamoDB console
  9. Under the Tables section in the navigation pane, choose Explore Items. Observe the individual audit logs by choosing the audit_id.
    Table audit item

Cleaning up

To avoid incurring ongoing charges, clean up the infrastructure by deleting the stack using the following command:

sam delete SAM confirmations

Delete the Amazon ECR repository, and any other resources you created as a prerequisite for this exercise.

Conclusion

In this post, you learned how to leverage an orchestration framework using Step Functions, Lambda, DynamoDB, and API Gateway to build an API backed by an open-source Drools rules engine, running on a container. Try this solution for your cloud native business rules orchestration use-case.

For more serverless learning resources, visit Serverless Land.

Build a multi-language notification system with Amazon Translate and Amazon Pinpoint

Post Syndicated from Praveen Allam original https://aws.amazon.com/blogs/architecture/build-a-multi-language-notification-system-with-amazon-translate-and-amazon-pinpoint/

Organizations with global operations can struggle to notify their customers of any business-related announcements or notifications in different languages. Their customers want to receive notifications in their local language and communication preference. Organizations often rely on complicated third-party services or individuals to manually translate the notifications. This can lead to a loss of revenue due to delayed communication and additional operational expenses.

This blog post demonstrates how to build a straightforward, cost-effective, and scalable multi-language notification system using AWS Serverless technologies. You can post a business-related announcement or notification in English, and based on the customer profile data, it will convert this announcement or notification into different languages. Additionally, the system will also deliver these translated announcements or notifications as an email, voice, or SMS.

Example of a multi-language notification use case

A restaurant franchise company is adding a new item to their menu and plans to release it in North America, Germany, and France. The corporate office has decided to send the following notification.

The company is adding a new item to the menu, and this will go live by May 10. Please ensure you are prepared for this change and plan accordingly.

The franchise owners in Germany want to receive the notifications in the German language, whereas the franchise owners in France want to receive it in French. North American franchises want to receive it in English.

Solution design for multi-language notification system

The solution in Figure 1 demonstrates how to build a multi-language notification system using Amazon Translate and Amazon Pinpoint.

AWS Serverless technologies handle automatic scaling, have built-in high availability architecture, and a pay-for-use billing model, which increases agility and optimizes costs. The system built with this solution is invoked using REST API endpoints. Once this solution is deployed, it can be integrated with any frontend application where users can log in and send out notification events.

Figure 1 illustrates the architecture of this solution.

Solution architecture for multi-language notification system. It includes all the AWS services that are required in this solution. The flow is described as follows.

Figure 1. Solution architecture for multi-language notification system

1. The restaurant franchise will log in to their UI to type the notification message in English. Upon submission, the notification message is sent to the Amazon API Gateway REST endpoint.
Note: In this solution, there is no UI available. You will use a terminal to submit the message.

2. Amazon API Gateway will send this message to Amazon Simple Queue Service (SQS), which will keep the HTTP requests asynchronous.

3. The SQS queue will invoke the SQS AWS Lambda function.

4. The SQS Lambda function invokes the AWS Step Functions state machine. This SQS Lambda function is used as a proxy mechanism to start the state machine workflow. AWS Step Functions are used to orchestrate the notification workflow process. The workflow process validates the message, converts it into different languages, and notifies the customers in their preferred way of communication (email, voice, or SMS). It also handles errors if any of the steps fail by using SQS dead-letter queue.

5. The message entered must be validated in order to ensure that the organizational standards are followed. To perform the message validation, we use the Amazon Comprehend service. Comprehend’s Sentiment analysis will determine whether to send or flag the message. All flagged messages are sent for review.

  • In the example use case message preceding, the message sentiment neutral score is 0.85 confidence. If you set the acceptable score to anything greater than 0.5 confidence, then it is a valid message. Once it passes the validation step, the workflow will proceed to the next step.
  • If the message is vague or not clear, the sentiment score might be less than 0.5 confidence. For example, if this is the message used: We are adding a dish; be ready for it, the sentiment score might be only 0.45 confidence. This is under the acceptable score, and the message will not be processed further.

6. After the message is successfully validated, the message is translated into various languages depending on the customers’ profiles. The Translate Lambda function determines the number of unique languages by referring to the customer profile data in the Amazon DynamoDB table. The function then uses Amazon Translate to translate the message to the different languages required for that notification event. In our example use case, the converted messages will look as follows:

  • German (de):

Das Unternehmen fügt dem Menü einen neuen Punkt hinzu, der bis zum 10. Mai live geschaltet wird. Bitte stellen Sie sicher, dass Sie auf diese Änderung vorbereitet sind und planen Sie entsprechend.

  • French (fr):

La société ajoute un nouvel article au menu, qui sera mis en ligne d’ici le 10 mai. Assurez-vous d’être prêt pour ce changement et de planifier en conséquence.

7. The last step in the workflow is to build the notification logic and deliver the notifications. The Amazon Pinpoint Lambda function retrieves the customer’s profile from the Amazon DynamoDB table. It then parses each record for a given notification event to find out the delivery mode (email, voice, or SMS message). The function then builds the notification logic using Amazon Pinpoint. Amazon Pinpoint notifies each customer either by email, voice, or SMS.

Code repository

The code for this solution is available on GitHub. Review the README file for detailed instructions on how to download and run the solution in your AWS account.

Conclusion

Organizations that operate on an international basis often struggle to build a multi-language notification system to communicate any business-related announcements or notifications to their customers in different languages. Communicating these announcements or notifications in a variety of formats such as email, voice, and SMS can be time-consuming. Our solution addresses these challenges using AWS services with fewer steps than traditional third-party options. This solution also features automatic scaling, built-in high availability, and a pay-for-use billing model to increase agility and optimize costs. These technologies not only decrease infrastructure management tasks like capacity provisioning and patching, but provides for a better customer experience.

Further reading:

Choosing the right solution for AWS Lambda external parameters

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/choosing-the-right-solution-for-aws-lambda-external-parameters/

This post is written by Thomas Moore, Solutions Architect, Serverless.

When using AWS Lambda to build serverless applications, customers often need to retrieve parameters from an external source at runtime. This allows you to share parameter values across multiple functions or microservices, providing a single source of truth for updates. A common example is retrieving database connection details from an external source and then using the retrieved hostname, user name, and password to connect to the database:

Lambda function retrieving database credentials from an external source

Lambda function retrieving database credentials from an external source

AWS provides a number of options to store parameter data, including AWS Systems Manager Parameter Store, AWS AppConfig, Amazon S3, and Lambda environment variables. This blog explores the different parameter data that you may need to store. I cover considerations for choosing the right parameter solution and how to retrieve and cache parameter data efficiently within the Lambda function execution environment.

Common use cases

Common parameter examples include:

  • Securely storing secret data, such as credentials or API keys.
  • Database connection details such as hostname, port, and credentials.
  • Schema data (for example, a structured JSON response).
  • TLS certificate for mTLS or JWT validation.
  • Email template.
  • Tenant configuration in a multitenant system.
  • Details of external AWS resources to communicate with such as an Amazon SQS queue URL, Amazon EventBridge event bus name, or AWS Step Functions ARN.

Key considerations

There are a number of key considerations when choosing the right solution for external parameter data.

  1. Cost – how much does it cost to store the data and retrieve it via an API call?
  2. Security – what encryption and fine-grained access control is required?
  3. Performance – what are the retrieval latency requirements?
  4. Data size – how much data is there to store and retrieve?
  5. Update frequency – how often does the parameter change and how does the function handle stale parameters?
  6. Access scope – do multiple functions or services access the parameter?

These considerations help to determine where to store the parameter data and how often to retrieve it.

For example, a 4KB parameter that updates hourly and is used by hundreds of functions needs to be optimized for low retrieval costs and high performance. Choosing a solution that supports low-cost API GET requests at a high transaction per second (TPS) would be better than one that supports large data.

AWS service options

There are a number of AWS services available to store external parameter data.

Amazon S3

S3 is an object storage service offering 99.999999999% (11 9s) of data durability and virtually unlimited scalability at low cost. Objects can be up to 5 TB in size in any format, making S3 a good solution to store larger parameter data.

Amazon DynamoDB

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed for single-digit millisecond performance at any scale. Due to the high performance of this service, it’s a great place to store parameters when low retrieval latency is important.

AWS Secrets Manager

AWS Secrets Manager makes it easier to rotate, manage, and retrieve secret data. This makes it the ideal place to store sensitive parameters such as passwords and API keys.

AWS Systems Manager Parameter Store

Parameter Store provides a centralized store to manage configuration data. This data can be plaintext or encrypted using AWS Key Management Service (KMS). Parameters can be tagged and organized into hierarchies for simpler management. Parameter Store is a good default choice for general-purpose parameters in AWS. The standard version (no additional charge) can store parameters up to 4 KB in size and the advanced version (additional charges apply) up to 8 KB.

For a code example using Parameter Store for Lambda parameters, see the Serverless Land pattern.

AWS AppConfig

AppConfig is a capability of AWS Systems Manager to create, manage, and quickly deploy application configurations. AppConfig allows you to validate changes during roll-outs and automatically roll back, if there is an error. AppConfig deployment strategies help to manage configuration changes safely.

AppConfig also provides a Lambda extension to retrieve and locally cache configuration data. This results in fewer API calls and reduced function duration, reducing costs.

AWS Lambda environment variables

You can store parameter data as Lambda environment variables as part of the function’s version-specific configuration. Lambda environment variables are stored during function creation or updates. You can access these variables directly from your code without needing to contact an external source. Environment variables are ideal for parameter values that don’t need updating regularly and help make function code reusable across different environments. However, unlike the other options, values cannot be accessed centrally by multiple functions or services.

Lambda execution lifecycle

It is worth understanding the Lambda execution lifecycle, which has a number of stages. This helps to decide when to handle parameter retrieval within your Lambda code, including cache management.

Lambda execution lifecycle

Lambda execution lifecycle

When a Lambda function is invoked for the first time, or when Lambda is scaling to handle additional requests, an execution environment is created. The first phase in the execution environment’s lifecycle is initialization (Init), during which the code outside the main handler function runs. This is known as a cold start.

The execution environment can then be re-used for subsequent invocations. This means that the Init phase does not need to run again and only the main handler function code runs. This is known as a warm start.

An execution environment can only run a single invocation at a time. Concurrent invocations require additional execution environments. When a new execution environment is required, this starts a new Init phase, which runs the cold start process.

Caching and updates

Retrieving the parameter during Init

Retrieving the parameter during Init

Retrieving the parameter during Init

As Lambda execution environments are re-used, you can improve the performance and reduce the cost of retrieving an external parameter by caching the value. Writing the value to memory or the Lambda /tmp file system allows it to be available during subsequent invokes in the same execution environment.

This approach reduces API calls, as they are not made during every invocation. However, this can cause an out-of-date parameter and potentially different values across concurrent execution environments.

The following Python example shows how to retrieve a Parameter Store value outside the Lambda handler function during the Init phase.

import boto3
ssm = boto3.client('ssm', region_name='eu-west-1')
parameter = ssm.get_parameter(Name='/my/parameter')
def lambda_handler(event, context):
    # My function code...

Retrieving the parameter on every invocation

Retrieving the parameter on every invocation

Retrieving the parameter on every invocation

Another option is to retrieve the parameter during every invocation by making the API call inside the handler code. This keeps the value up to date, but can lead to higher retrieval costs and longer function durations due to the added API call during every invocation.

The following Python example shows this approach:

import boto3
ssm = boto3.client('ssm', region_name='eu-west-1')
def lambda_handler(event, context):
    parameter = ssm.get_parameter(Name='/my/parameter')
    # My function code...

Using AWS AppConfig Lambda extension

Using AWS AppConfig Lambda extension

Using AWS AppConfig Lambda extension

AppConfig allows you to retrieve and cache values from the service using a Lambda extension. The extension retrieves the values and makes them available via a local HTTP server. The Lambda function then queries the local HTTP server for the value. The AppConfig extension refreshes the values at a configurable poll interval, which defaults to 45 seconds. This improves performance and reduces costs, as the function only needs to make a local HTTP call.

The following Python code example shows how to access the cached parameters.

import urllib.request
def lambda_handler(event, context):
    url = f'http://localhost:2772/applications/application_name/environments/environment_name/configurations/configuration_name'
    config = urllib.request.urlopen(url).read()
    # My function code...

For caching secret values using a Lambda extension local HTTP cache and AWS Secrets Manager, see the AWS Prescriptive Guidance documentation.

Using Lambda Powertools for Python or Java

Lambda Powertools for Python or Lambda Powertools for Java contains utilities to manage parameter caching. You can configure the cache interval, which defaults to 5 seconds. Supported parameter stores include Secrets Manager, AWS Systems Manager Parameter Store, AppConfig, and DynamoDB. You also have the option to bring your own provider. The following example shows the Powertools for Python parameters utility retrieving a single value from Systems Manager Parameter Store.

from aws_lambda_powertools.utilities import parameters
def handler(event, context):
    value = parameters.get_parameter("/my/parameter")
    # My function code…

Security

Parameter security is a key consideration. You should evaluate encryption at rest, in-transit, private network access, and fine-grained permissions for each external parameter solution based on the use case.

All services highlighted in this post support server-side encryption at rest, and you can choose to use AWS KMS to manage your own keys. When accessing parameters using the AWS SDK and CLI tools, connections are encrypted in transit using TLS by default. You can force most to use TLS 1.2.

To access parameters from inside an Amazon Virtual Private Cloud (Amazon VPC) without internet access, you can use AWS PrivateLink and create a VPC endpoint for each service. All the services mentioned in this post support AWS PrivateLink connections.

Use AWS Identity and Access Management (IAM) policies to manage which users or roles can access specific parameters.

General guidance

This blog explores a number of considerations to make when using an external source for Lambda parameters. The correct solution is use-case dependent. There are some general guidelines when selecting an AWS service.

  • For general-purpose low-cost parameters, use AWS Systems Manager Parameter Store.
  • For single function, small parameters, use Lambda environment variables.
  • For secret values that require automatic rotation, use AWS Secrets Manager.
  • When you need a managed cache, use the AWS AppConfig Lambda extension or Lambda Powertools for Python/Java.
  • For items larger than 400 KB, use Amazon S3.
  • When access frequency is high, and low latency is required, use Amazon DynamoDB.

Conclusion

External parameters provide a central source of truth across distributed systems, allowing for efficient updates and code reuse. This blog post highlights a number of considerations when using external parameters with Lambda to help you choose the most appropriate solution for your use case.

Consider how you cache and reuse parameters inside the Lambda execution environment. Doing this correctly can help you reduce costs and improve the performance of your Lambda functions.

There are a number of services to choose from to store parameter data. These include DynamoDB, S3, Parameter Store, Secrets Manager, AppConfig, and Lambda environment variables. Each comes with a number of advantages, depending on the use case. This blog guidance, along with the AWS documentation and Service Quotas, can help you select the most appropriate service for your workload.

For more serverless learning resources, visit Serverless Land.

Running cross-account workflows with AWS Step Functions and Amazon API Gateway

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/running-cross-account-workflows-with-aws-step-functions-and-amazon-api-gateway/

This post is written by Hardik Vasa, Senior Solutions Architect, and Pratik Jain, Cloud Infrastructure Architect.

AWS Step Functions allow you to build scalable and distributed applications using state machines. With the launch of Step Functions nested workflows, you can start a Step Functions workflow from another workflow. However, this requires both workflows to be in the same account. There are many use cases that require you to orchestrate workflows across different AWS accounts from one central AWS account.

This blog post covers a solution to invoke Step Functions workflows cross account using Amazon API Gateway. With this, you can perform cross-account orchestration for scheduling, ETL automation, resource deployments, security audits, and log aggregations all from a central account.

Overview

The following architecture shows a Step Functions workflow in account A invoking an API Gateway endpoint in account B, and passing the payload in the API request. The API then invokes another Step Functions workflow in account B asynchronously.  The resource policy on the API allows you to restrict access to a specific Step Functions workflow to prevent anonymous access.

Cross-account workflows

You can extend this architecture to run workflows across multiple Regions or accounts. This blog post shows running cross-account workflows with two AWS accounts.

To invoke an API Gateway endpoint, you can use Step Functions AWS SDK service integrations. This approach allows users to build solutions and integrate services within a workflow without writing code.

The example demonstrates how to use the cross-account capability using two AWS example accounts:

  • Step Functions state machine A: Account ID #111111111111
  • API Gateway API and Step Functions state machine B: Account ID #222222222222

Setting up

Start by creating state machine A in the account #111111111111. Next, create the state machine in target account #222222222222, followed by the API Gateway REST API integrated to the state machine in the target account.

Account A: #111111111111

In this account, create a state machine, which includes a state that invokes an API hosted in a different account.

Create an IAM role for Step Functions

  1. Sign in to the IAM console in account #111111111111, and then choose Roles from left navigation pane
  2. Choose Create role.
  3. For the Select trusted entity, under AWS service, select Step Functions from the list, and then choose Next.
  4. On the Add permissions page, choose Next.
  5. On the Review page, enter StepFunctionsAPIGatewayRole for Role name, and then choose Create role.
  6. Create inline policies to allow Step Functions to access the API actions of the services you need to control. Navigate to the role that you created and select Add Permissions and then Create inline policy.
  7. Use the Visual editor or the JSON tab to create policies for your role. Enter the following:
    Service: Execute-API
    Action: Invoke
    Resource: All Resources
  8. Choose Review policy.
  9. Enter APIExecutePolicy for name and choose Create Policy.

Creating a state machine in source account

  1. Navigate to the Step Functions console in account #111111111111 and choose Create state machine
  2. Select Design your workflow visually, and the click Standard and then click Next
  3. On the design page, search for APIGateway:Invoke state, then drag and drop the block on the page:
    Step Functions designer console
  4. In the API Gateway Invoke section on the right panel, update the API Parameters with the following JSON policy:
     {
      "ApiEndpoint.$": "$.ApiUrl",
      "Method": "POST",
      "Stage": "dev",
      "Path": "/execution",
      "Headers": {},
      "RequestBody": {
       "input.$": "$.body",
       "stateMachineArn.$": "$.stateMachineArn"
      },
      "AuthType": "RESOURCE_POLICY"
    }

    These parameters indicate that the ApiEndpoint, payload (body) and stateMachineArn are dynamically assigned values based on input provided during workflow execution. You can also choose to assign these values statically, based on your use case.

  5. [Optional] You can also configure the API Gateway Invoke state to retry upon task failure by configuring the retries setting.
    Configuring State Machine
  6. Choose Next and then choose Next again. On the Specify state machine settings page:
    1. Enter a name for your state machine.
    2. Select Choose an existing role under Permissions and choose StepFunctionsAPIGatewayRole.
    3. Select Log Level ERROR.
  7. Choose Create State Machine.

After creating this state machine, copy the state machine ARN for later use.

Account B: #222222222222

In this account, create an API Gateway REST API that integrates with the target state machine and enables access to this state machine by means of a resource policy.

Creating a state machine in the target account

  1. Navigate to the Step Functions Console in account #222222222222 and choose Create State Machine.
  2. Under Choose authoring method select Design your workflow visually and the type as Standard.
  3. Choose Next.
  4. On the design page, search for Pass state. Drag and drop the state.
    State machine
  5. Choose Next.
  6. In the Review generated code page, choose Next and:
    1. Enter a name for the state machine.
    2. Select Create new role under the Permissions section.
    3. Select Log Level ERROR.
  7. Choose Create State Machine.

Once the state machine is created, copy the state machine ARN for later use.

Next, set up the API Gateway REST API, which acts as a gateway to accept requests from the state machine in account A. This integrates with the state machine you just created.

Create an IAM Role for API Gateway

Before creating the API Gateway API endpoint, you must give API Gateway permission to call Step Functions API actions:

  1. Sign in to the IAM console in account #222222222222 and choose Roles. Choose Create role.
  2. On the Select trusted entity page, under AWS service, select API Gateway from the list, and then choose Next.
  3. On the Select trusted entity page, choose Next
  4. On the Name, review, and create page, enter APIGatewayToStepFunctions for Role name, and then choose Create role
  5. Choose the name of your role and note the Role ARN:
    arn:aws:iam::222222222222:role/APIGatewayToStepFunctions
  6. Select the IAM role (APIGatewayToStepFunctions) you created.
  7. On the Permissions tab, choose Add permission and choose Attach Policies.
  8. Search for AWSStepFunctionsFullAccess, choose the policy, and then click Attach policy.

Creating the API Gateway API endpoint

After creating the IAM role, create a custom API Gateway API:

  1. Open the Amazon API Gateway console in account #222222222222.
  2. Click Create API. Under REST API choose Build.
  3. Enter StartExecutionAPI for the API name, and then choose Create API.
  4. On the Resources page of StartExecutionAPI, choose Actions, Create Resource.
  5. Enter execution for Resource Name, and then choose Create Resource.
  6. On the /execution Methods page, choose Actions, Create Method.
  7. From the list, choose POST, and then select the check mark.

Configure the integration for your API method

  1. On the /execution – POST – Setup page, for Integration Type, choose AWS Service. For AWS Region, choose a Region from the list. For Regions that currently support Step Functions, see Supported Regions.
  2. For AWS Service, choose Step Functions from the list.
  3. For HTTP Method, choose POST from the list. All Step Functions API actions use the HTTP POST method.
  4. For Action Type, choose Use action name.
  5. For Action, enter StartExecution.
  6. For Execution Role, enter the role ARN of the IAM role that you created earlier, as shown in the following example. The Integration Request configuration can be seen in the image below.
    arn:aws:iam::222222222222:role/APIGatewayToStepFunctions
    API Gateway integration request configuration
  7. Choose Save. The visual mapping between API Gateway and Step Functions is displayed on the /execution – POST – Method Execution page.
    API Gateway method configuration

After you configure your API, you can configure the resource policy to allow the invoke action from the cross-account Step Functions State Machine. For the resource policy to function in cross-account scenarios, you must also enable AWS IAM authorization on the API method.

Configure IAM authorization for your method

  1. On the /execution – POST method, navigate to the Method Request, and under the Authorization option, select AWS_IAM and save.
  2. In the left navigation pane, choose Resource Policy.
  3. Use this policy template to define and enter the resource policy for your API.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "states.amazonaws.com"
                },
                "Action": "execute-api:Invoke",
                "Resource": "execute-api:/*/*/*",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceArn": [
                            "<SourceAccountStateMachineARN>"
                         ]
                    }
                }
            }
        ]
    }

    Note: You must replace <SourceAccountStateMachineARN> with the state machine ARN from account #111111111111 (account A).

  4. Choose Save.

Once the resource policy is configured, deploy the API to a stage.

Deploy the API

  1. In the left navigation pane, click Resources and choose Actions.
  2. From the Actions drop-down menu, choose Deploy API.
  3. In the Deploy API dialog box, choose [New Stage], enter dev in Stage name.
  4. Choose Deploy to deploy the API.

After deployment, capture the API ID, API Region, and the stage name. These are used as inputs during the execution phase.

Starting the workflow

To run the Step Functions workflow in account A, provide the following input:

{
   "ApiUrl": "<api_id>.execute-api.<region>.amazonaws.com",
   "stateMachineArn": "<stateMachineArn>",
   "body": "{\"someKey\":\"someValue\"}"
}

Start execution

Paste in the values of APIUrl and stateMachineArn from account B in the preceding input. Make sure the ApiUrl is in the format as shown.

AWS Serverless Application Model deployment

You can deploy the preceding solution architecture with the AWS Serverless Application Model (AWS SAM), which is an open-source framework for building serverless applications. During deployment, AWS SAM transforms and expands the syntax into AWS CloudFormation syntax, enabling you to build serverless applications faster.

Logging and monitoring

Logging and monitoring are vital for observability, measuring performance and audit purposes. Step Functions allows logging using CloudWatch Logs. Step Functions also automatically sends execution metrics to CloudWatch. You can learn more on monitoring Step Functions using CloudWatch.

Cleaning up

To avoid incurring any charges, delete all the resources that you have created in both the accounts. This would include deleting the Step Functions state machines and API Gateway API.

Conclusion

This blog post provides a step-by-step guide on securely invoking a cross-account Step Functions workflow from a central account using API Gateway as front end. This pattern can be extended to scale workflow executions across different Regions and accounts.

By using a centralized account to orchestrate workflows across AWS accounts, this can help prevent duplicating work in each account.

To learn more about serverless and AWS Step Functions, visit the Step Functions Developer Guide.

For more serverless learning resources, visit Serverless Land.

Incident notification mechanism using Amazon Pinpoint two-way SMS

Post Syndicated from Pavlos Ioannou Katidis original https://aws.amazon.com/blogs/messaging-and-targeting/incident-notification-mechanism-using-amazon-pinpoint-two-way-sms/

Unexpected situations that require immediate attention can occur in any industry. Part of resolving these incidents is the notifications’ delivery. For example, utility companies that have installed gas sensors need to notify immediately the available engineer if a leak occurs.

The goal of an incident management process is to restore a normal service operation as quickly as possible and to minimize the impact on business operations, thus ensuring that the best possible levels of service quality and availability are maintained. A key element of incident management is sending timely notifications to the assigned or available resource(s) who can rectify the issue.

An incident can take place at any time and the resource(s) assigned to it might not have internet access and even if they receive the message they might not be equipped to work on it. This creates five key requirements for an incident notifications mechanism:

  1. Notify the resources via a communication channel that ensures message delivery even without internet access
  2. Enable assigned resources to respond to a request via a communication channel that doesn’t require internet access
  3. Send reminder(s) in case there is no response from the assigned resource(s)
  4. Escalate to another resource in case the first one doesn’t reply or declines the incident
  5. Store the incident details & status for reporting and data analysis

In this blog post, I share a solution on how you can automate the delivery of incident notifications. This solution utilizes Amazon Pinpoint SMS channel to contact the designated resources who might not have access to the internet. Furthermore, the recipient of the SMS is able to reply with an acknowledgement. AWS Step Functions orchestrates the user journey using AWS Lambda functions to evaluate the recipients’ response and trigger the next best action. You will use AWS CloudFormation to deploy this solution.

Use Cases

An incident notification mechanism can vary depending the organization’s requirements and 3rd party system integrations. In this blog the solution covers all five points listed above but it might require further modifications depending your use case.

With minor modifications this solution can also be used in the following use cases:

  1. Medicine intake notification: It will notify the patient via SMS that it is their time to take their medicine. If the patient doesn’t acknowledge the SMS by replying then this can be escalated to their assigned doctor
  2. Assignment submission: It will notify the student that their assignment is due. If the student doesn’t acknowledge the SMS by replying then this can be escalated to their teacher

High-level Architecture

The solution requires the country of your SMS recipients to support two-way SMS. To check which countries, support two-way SMS visit this page.  If two-way SMS is supported then you will need to request a dedicated originating identity. You can also use Toll Free Number or 10DLC if your recipients are in the US.

Note: Sender ID doesn’t support two-way SMS.

A new incident is represented as an item in an Amazon DynamoDB table containing information such as description, URL, incident_id as well as the contact numbers for two resources. A resource is someone who has been assigned to work on this incident. The second resource is for escalation purposes in case the first one doesn’t acknowledge or decline the incident notification.

The Amazon DynamoDB table covers three functions for this solution:

  1. A way to add new incidents using either the AWS console or programmatically
  2. As a storage for variables that indicate the incident’s status and can be used from the solution to determine the next action(s)
  3. As a historical data storage for all incidents that have been created for data analysis purposes

The solution utilizes Amazon DynamoDB Streams to invoke an AWS Lambda function every time a new incident is created. The AWS Lambda function triggers an AWS Step Function State machine, which orchestrates three AWS Lambda functions:

  1. Send_First_SMS: Sends the first SMS
  2. Reminder_SMS: Sends a reminder SMS if the resource does not acknowledge the first SMS
  3. Incident_State_Review: Assesses the status of the incident and either goes back to the first AWS Lambda function or finishes the AWS Step Function State machine execution

The AWS Step Functions State machine uses the Choice state, which evaluates the response of the previous AWS Lambda function and decides on the next state. This is a very useful feature that can reduce custom code and potentially AWS Lambda invocations resulting to cost savings.

Additionally, the waiting between steps is also managed from AWS Step Functions State machine using the Wait state. This can be configured to wait seconds, days or till a specific point in the future.

To be able to receive SMS, this solution uses Amazon Pinpoint’s two-way SMS feature. When receiving an SMS Amazon Pinpoint sends a payload to an Amazon SNS topic, which needs to be created separately. An AWS Lambda function that is subscribed to the Amazon SNS topic processes the SMS content and performs one or both of the following actions:

  1. Update the incident status in the DynamoDB table
  2. Create a new Step Function State machine execution

In this solution SMS recipients can reply by typing either yes or no. The SMS response is not case sensitive.

An inbound SMS payload contains the originationNumber, destinationNumber, messageKeyword, messageBody, inboundMessageId and previousPublishedMessageId. Noticeably there isn’t a direct way to associate an inbound SMS with an incident. To overcome this challenge this solution uses a second DynamoDB table, which stores the message_id and incident_id every time an SMS is send to any of the two resources. This allows the solution to use the previousPublishedMessageId from the inbound SMS payload to fetch the respective incident_id from the second DynamoDB table.

The code in this solution uses AWS SDK for Python (Boto3).

Prerequisites

  1. An Amazon Pinpoint project with the SMS channel enabled – Guide on how to enable Amazon Pinpoint SMS channel
  2. Check if the country you want to send SMS to, supports two-way SMS – List with countries that support two-way SMS
  3. An originating identity that supports two-way SMS – Guide on how to request a phone number
  4. Increase your monthly SMS spending quota for Amazon Pinpoint – Guide on how to increase the monthly SMS spending quota

Deploy the solution

Step 1: Create an S3 bucket

  1. Navigate to the Amazon S3 console
  2. Select Create bucket
  3. Enter a unique name for Bucket name
  4. Select the AWS Region to be the same as the one of your Amazon Pinpoint project
  5. Scroll to the bottom of the page and select Create bucket
  6. Follow this link to download the GitHub repository. Once the repository is downloaded, unzip it and navigate to  \amazon-pinpoint-incident-notifications-mechanism-main\src
  7. Access the S3 bucket created above and upload the five .zip files

Step 2: Create a stack

  1. The application is deployed using an AWS CloudFormation template.
  2. Navigate to the AWS CloudFormation console select Create stack > With new resources (standard)
  3. Select Template is ready as Prerequisite – Prepare template and choose Upload a template file as Template source
  4. Select Choose file and from the GitHub repository downloaded in step 1.6 navigate to amazon-pinpoint-incident-notifications-mechanism-main\cfn upload CloudFormation_template.yaml and select Next
  5. Type Pinpoint-Incident-Notifications-Mechanism as Stack name, paste the S3 bucket name created in step 1.5 as the LambdaCodeS3BucketName, type the Amazon Pinpoint Originating Number in E.164 format as OriginatingIdenity, paste the Amazon Pinpoint project ID as PinpointProjectId and type 40 for WaitingBetweenSteps
  6. Select Next, till you reach to Step 4 Review where you will need to check the box I acknowledge that AWS CloudFormation might create IAM resources and then select Create Stack
  7. The stack creation process takes approximately 2 minutes. Click on the refresh button to get the latest event regarding the deployment status. Once the stack has been deployed successfully you should see the last Event with Logical ID Pinpoint-Incident-Notifications-Mechanism and with Status CREATE_COMPLETE

Step 3: Configure two-way SMS SNS topic

  1. Navigate to the Amazon Pinpoint console > SMS and voice > Phone numbers. Select the originating identity that supports two-way SMS. Scroll to the bottom of the page and click to expand the  and check the box to enable it.

    For SNS topic select Choose an existing SNS topic then using the drop down choose the one that contains the name of the AWS CloudFormation stack from Step 2.4 as well as the name TwoWaySMSSNSTopic and click Save.

Step 4: Create a new incident

To create a new incident, navigate to Amazon DynamoDB console > Tables and select the table containing the name of the AWS CloudFormation stack from Step 2.4 as well as the name IncidentInfoDynamoDB. Select View items and then Create item.

On the Create item page choose JSON, copy and paste the JSON below into the text box and replace the values for the first_contact and second_contact with a valid mobile number that you have access to.

Note: If you don’t have two different mobile numbers, enter the same for both first_contact and second_contact fields. The mobile numbers must follow E.164 format +<country code><number>.

{
   "incident_id":{
      "S":"123"
   },
   "incident_stat":{
      "S":"not_acknowledged"
   },
   "double_escalation":{
      "S":"no"
   },
   "description":{
      "S":"Error 111, unit 1 malfunctioned. Urgent assistance is required."
   },
   "url":{
      "S":"https://example.com/incident/111/overview"
   },
   "first_contact":{
      "S":"+4479083---"
   },
   "second_contact":{
      "S":"+4479083---"
   }
}

Incident fields description:

  • incident_id: Needs to be unique
  • incident_stat: This is used from the application to store the incident status. When creating the incident, this value should always be not_acknowledged
  • double_escalation: This is used from the application as a flag for recipients who try to escalate an incident that is already escalated. When creating the incident, this value should always be no
  • description: You can type a description that best describes the incident. Be aware that depending the number of characters the SMS parts will increase. For more information on SMS character limits visit this page
  • url: You can add a URL that resources can access to resolve the issue. If this field is not pertinent to your use case then type no url
  • first_contact: This should contain the mobile number in E.164 format for the first resource
  • second_contact: This should contain the mobile number in E.164 format for the second resource. The second resource will be contacted only if the first one does not acknowledge the SMS or declines the incident

Once the above is ready you can select Create item. This will execute the AWS Step Functions State machine and you should receive an SMS. You can reply with yes to acknowledge the incident or with no to decline it. Depending your response, the incident status in the DynamoDB table will be updated and if you reply no then the incident will be escalated sending a SMS to the second_contact.

Note: The SMS response is not case sensitive.

Clean-up

To remove the solution:

  1. Delete the AWS CloudFormation stack by following the steps listed in this guide
  2. Delete the dedicated originating identity that you used to send the SMS by following the steps listed in this guide
  3. Delete the Amazon Pinpoint project by navigating the Amazon Pinpoint console, select your Amazon Pinpoint Project, choose Settings > General settings > Delete Project

Next Steps

This solution currently works only if your SMS recipients are in one country. If your use case requires to send SMS to multiple countries you will need to:

  • Check this page to ensure that these countries support two-way SMS
  • Follow the instructions in this page to obtain a number that supports two-way SMS for each country
  • Expand the solution to identify the country of the SMS recipient and to choose the correct number accordingly. To identify the country of the SMS recipient you can use Amazon Pinpoint’s phone number validate service via Amazon Pinpoint API or SDKs. The phone validate service returns a list of data points per mobile number with one of them being the Country

Incidents that are not being acknowledged by any of the assigned resources, have their status updated to unacknowledged but they don’t escalate further. Depending your requirements, you can expand the solution to send an email using Amazon Pinpoint APIs or perform an outbound call using Amazon Connect APIs.

Conclusion

In this blog post, I have demonstrated how your organization can use Amazon Pinpoint two-way SMS and Step Functions to automate incident notifications. Furthermore, the solution highlights the synergy of AWS services and how you can build a custom solution with little effort that meets your requirements.

About the Author

Pavlos Ioannou Katidis

Pavlos Ioannou Katidis

Pavlos Ioannou Katidis is an Amazon Pinpoint and Amazon Simple Email Service Specialist Solutions Architect at AWS. He loves to dive deep into his customer’s technical issues and help them design communication solutions. In his spare time, he enjoys playing tennis, watching crime TV series, playing FPS PC games, and coding personal projects.

Composing AWS Step Functions to abstract polling of asynchronous services

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/composing-aws-step-functions-to-abstract-polling-of-asynchronous-services/

This post is written by Nicolas Jacob Baer, Senior Cloud Application Architect, Goeksel Sarikaya, Senior Cloud Application Architect, and Shukhrat Khodjaev, Engagement Manager, AWS ProServe.

AWS Step Functions workflows can use the three main integration patterns when calling AWS services. A common integration pattern is to call a service and wait for a response before proceeding to the next step.

While this works well with AWS services, there is no built-in support for third-party, on-premises, or custom-built services running on AWS. When a workflow integrates with such a service in an asynchronous fashion, it requires a primary step to invoke the service. There are additional steps to wait and check the result, and handle possible error scenarios, retries, and fallbacks.

Although this approach may work well for small workflows, it does not scale to multiple interactions in different steps of the workflow and becomes repetitive. This may result in a complex workflow, which makes it difficult to build, test, and modify. In addition, large workflows with many repeated steps are more difficult to troubleshoot and understand.

This post demonstrates how to decompose the custom service integration by nesting Step Functions workflows. One basic workflow is dedicated to handling the asynchronous communication that offers modularity. It can be re-used as a building block. Another workflow is used to handle the main process by invoking the nested workflow for service interaction, where all the repeated steps are now hidden in multiple executions of the nested workflow.

Overview

Consider a custom service that provides an asynchronous interface, where an action is initially triggered by an API call. After a few minutes, the result is available to be polled by the caller. The following diagram shows a basic workflow interacting with this custom service that encapsulates the service communication in a workflow:

Workflow diagram

  1. Call Custom Service API – calls a custom service, in this case through an API. Potentially, this could use a service integration or use AWS Lambda if there is custom code.
  2. Wait – waits for the service to prepare a result. This depends on the service that the workflow is interacting with, and could vary from seconds to days to months.
  3. Poll Result – attempts to poll the result from the custom service.
  4. Choice – repeats the polling in case the result was not available yet, move on to failed or success state if result was retrieved. In addition, a timeout should be in place here in case the result is not available within the expected time range. Otherwise, this might lead to an infinite loop.
  5. Fail – fails the workflow if a timeout or a threshold for the number of retries with error conditions is reached.
  6. Transform Result – transforms the result or adds additional meta information to provide further information to the caller (for example, runtime or retries).
  7. Success – finishes the workflow successfully.

If you build a larger workflow that interacts with this custom service in multiple steps in a workflow, you can reduce the complexity by using the Step Functions integration to call the nested workflow with a Wait state.

An illustration of this can be found in the following diagram, where the nested stack is called three times sequentially. Likewise, you can build a more complex workflow that adds additional logic through more steps or interacts with a custom service in parallel. The polling logic is hidden in the nested workflow.

Nested workflow

Walkthrough

To get started with AWS Step Functions and Amazon API Gateway using the AWS Management Console:

  1. Go to AWS Step Functions in the AWS Management Console.
  2. Choose Run a sample project and choose Start a workflow within a workflow.
    Defining state machine
  3. Scroll down to the sample projects, which are defined using Amazon States Language (ASL).
    Definition
  4. Review the example definition, then choose Next.
  5. Choose Deploy resources. The deployment can take up to 10 minutes. After deploying the resources, you can edit the sample ASL code to define steps in the state machine.
    Deploy resources
  6. The deployment creates two state machines: NestingPatternMainStateMachine and NestingPatternAnotherStateMachine. NestingPatternMainStateMachine orchestrates the other nested state machines sequentially or in parallel.
    Two state machines
  7. Select a state machine, then choose Edit to edit the workflow. In the NestingPatternMainStateMachine, the first state triggers the nested workflow NestingPatternAnotherStateMachine. You can pass necessary parameters to the nested workflow by using Parameters and Input as shown in the example below with Parameter1 and Parameter2. Once the first nested workflow completes successfully, the second nested workflow is triggered. If the result of the first nested workflow is not successful, the NestingPatternMainStateMachine fails with the Fail state.
    Editing state machine
  8. Select nested workflow NestingPatternAnotherStateMachine, and then select Edit to add AWS Lambda functions to start a job and poll the state of the jobs. This can be any asynchronous job that needs to be polled to query its state. Based on expected job duration, the Wait state can be configured for 10-20 seconds. If the workflow is successful, the main workflow returns a successful result.
    Edited next state machine

Use cases and limitations

This approach allows encapsulation of workflows consisting of multiple sequential or parallel services. Therefore, it provides flexibility that can be used for different use cases. Services can be part of distributed applications, part of automated business processes, big data or machine learning pipelines using AWS services.

Each nested workflow is responsible for an individual step in the main workflow, providing flexibility and scalability. Hundreds of nested workflows can run and be monitored in parallel with the main workflow (see AWS Step Functions Service Quotas).

The approach described here is not applicable for custom service interactions faster than 1 second, since it is the minimum configurable value for a wait step.

Nested workflow encapsulation

Similar to the principle of encapsulation in object-oriented programming, you can use a nested workflow for different interactions with a custom service. You can dynamically pass input parameters to the nested workflow during workflow execution and receive return values. This way, you can define a clear interface between the nested workflow and the parent workflow with different actions and integrations. Depending on the use-case, a custom service may offer a variety of different actions that must be integrated into workflows that run in Step Functions, but can all be combined into a single workflow.

Debugging and tracing

Additionally, debugging and tracing can be done through Execution Event History in the State Machine Management Console. In the Resource column, you can find a link to the executed nested step function. It can be debugged in case of any error in the nested Step Functions workflow.

Execution event history

However, debugging can be challenging in case of multiple parallel nested workflows. In such cases, AWS X-Ray can be enabled to visualize the components of a state machine, identify performance bottlenecks, and troubleshoot requests that have led to an error.

To enable AWS X-Ray in AWS Step Functions:

  1. Open the Step Functions console and choose Edit state machine.
  2. Scroll down to Tracing settings, and Choose Enable X-Ray tracing.

Tracing

For detailed information regarding AWS X-Ray and AWS Step Functions please refer to the following documentation: https://docs.aws.amazon.com/step-functions/latest/dg/concepts-xray-tracing.html

Conclusion

This blog post describes how to compose a nested Step Functions workflow, which asynchronously manages a custom service using the polling mechanism.

To learn more about how to use AWS Step Functions workflows for serverless microservices orchestration, visit Serverless Land.

Building a serverless image catalog with AWS Step Functions Workflow Studio

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-a-serverless-image-catalog-with-aws-step-functions-workflow-studio/

This post is written by Pascal Vogel, Associate Solutions Architect, and Benjamin Meyer, Sr. Solutions Architect.

Workflow Studio is a low-code visual workflow designer for AWS Step Functions that enables the orchestration of serverless workflows through a guided interactive interface. With the integration of Step Functions and the AWS SDK, you can now access more than 200 AWS services and over 9,000 API actions in your state machines.

This walkthrough uses Workflow Studio to implement a serverless image cataloging pipeline. It includes content moderation, automated tagging, and parallel image processing. Workflow Studio allows you to set up API integrations to other AWS services quickly with drag and drop actions, without writing custom application code.

Solution overview

Photo sharing websites often allow users to publish user-generated content such as text, images, or videos. Manual content review and categorization can be challenging. This solution enables the automation of these tasks.

Workflow overview

In this workflow:

  1. An image stored in Amazon S3 is checked for inappropriate content using the Amazon Rekognition DetectModerationLabels API.
  2. Based on the result of (1), appropriate images are forwarded to image processing while inappropriate ones trigger an email notification.
  3. Appropriate images undergo two processing steps in parallel: the detection of objects and text in the image via Amazon Rekognition’s DetectLabels and DetectText APIs. The results of both processing steps are saved in an Amazon DynamoDB table.
  4. An inappropriate image triggers an email notification for manual content moderation via the Amazon Simple Notification Service (SNS).

Prerequisites

To follow this walkthrough, you need:

  1. An AWS account.
  2. An AWS user with AdministratorAccess (see the instructions on the AWS Identity and Access Management (IAM) console).
  3. AWS CLI using the instructions here.
  4. AWS Serverless Application Model (AWS SAM) CLI using the instructions here.

Initial project setup

Get started by cloning the project repository from GitHub:

git clone https://github.com/aws-samples/aws-step-functions-image-catalog-blog.git

The cloned repository contains two AWS SAM templates.

  1. The starter directory contains a template. It deploys AWS resources and permissions that you use later for building the image cataloging workflow.
  2. The solution directory contains a template that deploys the finished image cataloging pipeline. Use this template if you want to skip ahead to the finished solution.

Both templates deploy the following resources to your AWS account:

  • An Amazon S3 bucket that holds the image files for the catalog.
  • A DynamoDB table as the data store of the image catalog.
  • An SNS topic and subscription that allow you to send an email notification.
  • A Step Functions state machine that defines the processing steps in the cataloging pipeline.

To follow the walkthrough, deploy the AWS SAM template in the starter directory using the AWS SAM CLI:

cd aws-step-functions-image-catalog-blog/starter
sam build
sam deploy --guided

Configure the AWS SAM deployment as follows. Input your email address for the parameter ModeratorEmailAddress:

Configuring SAM deploy

During deployment, you receive an email asking you to confirm the subscription to notifications generated by the Step Functions workflow. In the email, choose Confirm subscription to receive these notifications.

Subscription message

Confirm successful resource creation by going to the AWS CloudFormation console. Open the serverless-image-catalog-starter stack and choose the Stack info tab:

CloudFormation stack

View the Outputs tab of the CloudFormation stack. You reference these items later in the walkthrough:

Outputs tab

Implementing the image cataloging pipeline

Accessing Step Functions Workflow Studio

To access Step Functions in Workflow Studio:

  1. Access the Step Functions console.
  2. In the list of State machines, select image-catalog-workflow-starter.
  3. Choose the Edit button.
  4. Choose Workflow Studio.

Workflow Studio

Workflow Studio consists of three main areas:

  1. The Canvas lets you modify the state machine graph via drag and drop.
  2. The States Browser lets you browse and search more than 9,000 API Actions from over 200 AWS services.
  3. The Inspector panel lets you configure the properties of state machine states and displays the Step Functions definition in the Amazon States Language (ASL).

For the purpose of this walkthrough, you can delete the Pass state present in the state machine graph. Right click on it and choose Delete state.

Auto-moderating content with Amazon Rekognition and the Choice State

Use Amazon Rekognition’s DetectModerationLabels API to detect inappropriate content in the images processed by the workflow:

  1. In the States browser, search for the DetectModerationLabels API action.
  2. Drag and drop the API action on the state machine graph on the canvas.

Drag and drop

In the Inspector panel, select the Configuration tab and add the following API Parameters:

{
  "Image": {
    "S3Object": {
      "Bucket.$": "$.bucket",
      "Name.$": "$.key"
    }
  }
}

Switch to the Output tab and check the box next to Add original input to output using ResultPath. This allows you to pass both the original input and the task’s output on to the next state on the state machine graph.

Input the following ResultPath:

$.moderationResult

Step Functions enables you to make decisions based on the output of previous task states via the choice state. Use the result of the DetectModerationLabels API action to decide how to proceed with the image:

  1. Access the Flow tab in the States browser. Drag and drop a Choice state to the state machine graph below the DetectModerationLabels API action.
  2. In the States browser, choose Flow.
  3. Select a Choice state and place it after the DetectModerationLabels state on the graph.
  4. Select the added Choice state.
  5. In the Inspector panel, choose Rule #1 and select Edit.
  6. Choose Add conditions.
  7. For Variable, enter $.moderationResult.ModerationLabels[0].
  8. For Operator, choose is present.
  9. Choose Save conditions.
    Conditions for rule #1

If Amazon Rekognition detects inappropriate content, the workflow notifies content moderators to inspect the image manually:

  1. In the States browser, find the SNS Publish API Action.
  2. Drag the Action into the Rule #1 branch of the Choice state.
  3. For API Parameters, select the SNS topic that is visible in the Outputs of the serverless-image-catalog-starter stack in the CloudFormation console.

SNS topic in Workflow Studio

Speeding up image cataloging with the Parallel state

Appropriate images should be processed and included in the image catalog. In this example, processing includes the automated generation of tags based on objects and text identified in the image.

To accelerate this, instruct Step Functions to perform these tasks concurrently via a Parallel state:

  1. In the States browser, select the Flow tab.
  2. Drag and drop a Parallel state onto the Default branch of the previously added Choice state.
  3. Search the Amazon Rekognition DetectLabels API action in the States browser
  4. Drag and drop it inside the parallel state.
  5. Configure the following API parameters:
    {
      "Image": {
        "S3Object": {
          "Bucket.$": "$.bucket",
          "Name.$": "$.key"
        }
      }
    }
    
  6. Switch to the Output tab and check the box next to Add original input to output using ResultPath. Set the ResultPath to $.output.

Record the results of the Amazon Rekognition DetectLabels API Action to the DynamoDB database:

  1. Place a DynamoDB UpdateItem API Action inside the Parallel state below the Amazon Rekognition DetectLabels API action.
  2. Configure the following API Parameters to save the tags to the DynamoDB table. Input the name of the DynamoDB table visible in the Outputs of the serverless-image-catalog-starter stack in the CloudFormation console:
{
  "TableName": "<DynamoDB table name>",
  "Key": {
    "Id": {
      "S.$": "$.key"
    }
  },
  "UpdateExpression": "set detectedObjects=:o",
  "ExpressionAttributeValues": {
    ":o": {
      "S.$": "States.JsonToString($.output.Labels)"
    }
  }
}

This API parameter definition makes use of an intrinsic function to convert the list of objects identified by Amazon Rekognition from JSON to String.

Intrinsic functions

In addition to objects, you also want to identify text in images and store it in the database. To do so:

  1. Drag and drop an Amazon Rekognition DetectText API action into the Parallel state next to the DetectLabels Action.
  2. Configure the API Parameters and ResultPath identical to the DetectLabels API Action.
  3. Place another DynamoDB UpdateItem API Action inside the Parallel state below the Amazon Rekognition DetectText API Action. Set the following API Parameters and input the same DynamoDB table name as before.
{
  "TableName": "<DynamoDB table name>",
  "Key": {
    "Id": {
      "S.$": "$.key"
    }
  },
  "UpdateExpression": "set detectedText=:t",
  "ExpressionAttributeValues": {
    ":t": {
      "S.$": "States.JsonToString($.output.TextDetections)"
    }
  }
}

To save the state machine:

  1. Choose Apply and exit.
  2. Choose Save.
  3. Choose Save anyway.

Finishing up and testing the image cataloging workflow

To test the image cataloging workflow, upload an image to the S3 bucket created as part of the initial project setup. Find the name of the bucket in the Outputs of the serverless-image-catalog-starter stack in the CloudFormation console.

  1. Select the image-catalog-workflow-starter state machine in the Step Functions console.
  2. Choose Start execution.
  3. Paste the following test event (use your S3 bucket name):
    {
        "bucket": "<S3-bucket-name>",
        "key": "<Image-name>.jpeg"
    }
    
  4. Choose Start execution.

Once the execution has started, you can follow the state of the state machine live in the Graph inspector. For an appropriate image, the result will look as follows:

Graph inspector

Next, repeat the test process with an image that Amazon Rekognition classifies as inappropriate. Find out more about inappropriate content categories here. This produces the following result:

Graph inspector

You receive an email notifying you regarding the inappropriate image and its properties.

Cleaning up

To clean up the resources provisioned as part of the solution run the following command in the aws-step-functions-image-catalog-blog/starter directory:

sam delete

Conclusion

This blog post demonstrates how to implement a serverless image cataloging pipeline using Step Functions Workflow Studio. By orchestrating AWS API actions and flow states via drag and drop, you can process user-generated images. This example checks images for appropriateness and generates tags based on their content without custom application code.

You can now expand and improve this workflow by triggering it automatically each time an image is uploaded to the Amazon S3 bucket or by adding a manual approval step for submitted content. To find out more about Workflow Studio, visit the AWS Step Functions Developer Guide.

For more serverless learning resources, visit Serverless Land.

How the Georgia Data Analytics Center built a cloud analytics solution from scratch with the AWS Data Lab

Post Syndicated from Kanti Chalasani original https://aws.amazon.com/blogs/big-data/how-the-georgia-data-analytics-center-built-a-cloud-analytics-solution-from-scratch-with-the-aws-data-lab/

This is a guest post by Kanti Chalasani, Division Director at Georgia Data Analytics Center (GDAC). GDAC is housed within the Georgia Office of Planning and Budget to facilitate governed data sharing between various state agencies and departments.

The Office of Planning and Budget (OPB) established the Georgia Data Analytics Center (GDAC) with the intent to provide data accountability and transparency in Georgia. GDAC strives to support the state’s government agencies, academic institutions, researchers, and taxpayers with their data needs. Georgia’s modern data analytics center will help to securely harvest, integrate, anonymize, and aggregate data.

In this post, we share how GDAC created an analytics platform from scratch using AWS services and how GDAC collaborated with the AWS Data Lab to accelerate this project from design to build in record time. The pre-planning sessions, technical immersions, pre-build sessions, and post-build sessions helped us focus on our objectives and tangible deliverables. We built a prototype with a modern data architecture and quickly ingested additional data into the data lake and the data warehouse. The purpose-built data and analytics services allowed us to quickly ingest additional data and deliver data analytics dashboards. It was extremely rewarding to officially release the GDAC public website within only 4 months.

A combination of clear direction from OPB executive stakeholders, input from the knowledgeable and driven AWS team, and the GDAC team’s drive and commitment to learning played a huge role in this success story. GDAC’s partner agencies helped tremendously through timely data delivery, data validation, and review.

We had a two-tiered engagement with the AWS Data Lab. In the first tier, we participated in a Design Lab to discuss our near-to-long-term requirements and create a best-fit architecture. We discussed the pros and cons of various services that can help us meet those requirements. We also had meaningful engagement with AWS subject matter experts from various AWS services to dive deeper into the best practices.

The Design Lab was followed by a Build Lab, where we took a smaller cross section of the bigger architecture and implemented a prototype in 4 days. During the Build Lab, we worked in GDAC AWS accounts, using GDAC data and GDAC resources. This not only helped us build the prototype, but also helped us gain hands-on experience in building it. This experience also helped us better maintain the product after we went live. We were able to continually build on this hands-on experience and share the knowledge with other agencies in Georgia.

Our Design and Build Lab experiences are detailed below.

Step 1: Design Lab

We wanted to stand up a platform that can meet the data and analytics needs for the Georgia Data Analytics Center (GDAC) and potentially serve as a gold standard for other government agencies in Georgia. Our objective with the AWS Data Design Lab was to come up with an architecture that meets initial data needs and provides ample scope for future expansion, as our user base and data volume increased. We wanted each component of the architecture to scale independently, with tighter controls on data access. Our objective was to enable easy exploration of data with faster response times using Tableau data analytics as well as build data capital for Georgia. This would allow us to empower our policymakers to make data-driven decisions in a timely manner and allow State agencies to share data and definitions within and across agencies through data governance. We also stressed on data security, classification, obfuscation, auditing, monitoring, logging, and compliance needs. We wanted to use purpose-built tools meant for specialized objectives.

Over the course of the 2-day Design Lab, we defined our overall architecture and picked a scaled-down version to explore. The following diagram illustrates the architecture of our prototype.

The architecture contains the following key components:

  • Amazon Simple Storage Service (Amazon S3) for raw data landing and curated data staging.
  • AWS Glue for extract, transform, and load (ETL) jobs to move data from the Amazon S3 landing zone to Amazon S3 curated zone in optimal format and layout. We used an AWS Glue crawler to update the AWS Glue Data Catalog.
  • AWS Step Functions for AWS Glue job orchestration.
  • Amazon Athena as a powerful tool for a quick and extensive SQL data analysis and to build a logical layer on the landing zone.
  • Amazon Redshift to create a federated data warehouse with conformed dimensions and star schemas for consumption by Tableau data analytics.

Step 2: Pre-Build Lab

We started with planning sessions to build foundational components of our infrastructure: AWS accounts, Amazon Elastic Compute Cloud (Amazon EC2) instances, an Amazon Redshift cluster, a virtual private cloud (VPC), route tables, security groups, encryption keys, access rules, internet gateways, a bastion host, and more. Additionally, we set up AWS Identity and Access Management (IAM) roles and policies, AWS Glue connections, dev endpoints, and notebooks. Files were ingested via secure FTP, or from a database to Amazon S3 using AWS Command Line Interface (AWS CLI). We crawled Amazon S3 via AWS Glue crawlers to build Data Catalog schemas and tables for quick SQL access in Athena.

The GDAC team participated in Immersion Days for training in AWS Glue, AWS Lake Formation, and Amazon Redshift in preparation for the Build Lab.

We defined the following as the success criteria for the Build Lab:

  • Create ETL pipelines from source (Amazon S3 raw) to target (Amazon Redshift). These ETL pipelines should create and load dimensions and facts in Amazon Redshift.
  • Have a mechanism to test the accuracy of the data loaded through our pipelines.
  • Set up Amazon Redshift in a private subnet of a VPC, with appropriate users and roles identified.
  • Connect from AWS Glue to Amazon S3 to Amazon Redshift without going over the internet.
  • Set up row-level filtering in Amazon Redshift based on user login.
  • Data pipelines orchestration using Step Functions.
  • Build and publish Tableau analytics with connections to our star schema in Amazon Redshift.
  • Automate the deployment using AWS CloudFormation.
  • Set up column-level security for the data in Amazon S3 using Lake Formation. This allows for differential access to data based on user roles to users using both Athena and Amazon Redshift Spectrum.

Step 3: Four-day Build Lab

Following a series of implementation sessions with our architect, we formed the GDAC data lake and organized downstream data pulls for the data warehouse with governed data access. Data was ingested in the raw data landing lake and then curated into a staging lake, where data was compressed and partitioned in Parquet format.

It was empowering for us to build PySpark Extract Transform Loads (ETL) AWS Glue jobs with our meticulous AWS Data Lab architect. We built reusable glue jobs for the data ingestion and curation using the code snippets provided. The days were rigorous and long, but we were thrilled to see our centralized data repository come into fruition so rapidly. Cataloging data and using Athena queries proved to be a fast and cost-effective way for data exploration and data wrangling.

The serverless orchestration with Step Functions allowed us to put AWS Glue jobs into a simple readable data workflow. We spent time designing for performance and partitioning data to minimize cost and increase efficiency.

Database access from Tableau and SQL Workbench/J were set up for my team. Our excitement only grew as we began building data analytics and dashboards using our dimensional data models.

Step 4: Post-Build Lab

During our post-Build Lab session, we closed several loose ends and built additional AWS Glue jobs for initial and historic loads and append vs. overwrite strategies. These strategies were picked based on the nature of the data in various tables. We returned for a second Build Lab to work on building data migration tasks from Oracle Database via VPC peering, file processing using AWS Glue DataBrew, and AWS CloudFormation for automated AWS Glue job generation. If you have a team of 4–8 builders looking for a fast and easy foundation for a complete data analytics system, I would highly recommend the AWS Data Lab.

Conclusion

All in all, with a very small team we were able to set up a sustainable framework on AWS infrastructure with elastic scaling to handle future capacity without compromising quality. With this framework in place, we are moving rapidly with new data feeds. This would not have been possible without the assistance of the AWS Data Lab team throughout the project lifecycle. With this quick win, we decided to move forward and build AWS Control Tower with multiple accounts in our landing zone. We brought in professionals to help set up infrastructure and data compliance guardrails and security policies. We are thrilled to continually improve our cloud infrastructure, services and data engineering processes. This strong initial foundation has paved the pathway to endless data projects in Georgia.


About the Author

Kanti Chalasani serves as the Division Director for the Georgia Data Analytics Center (GDAC) at the Office of Planning and Budget (OPB). Kanti is responsible for GDAC’s data management, analytics, security, compliance, and governance activities. She strives to work with state agencies to improve data sharing, data literacy, and data quality through this modern data engineering platform. With over 26 years of experience in IT management, hands-on data warehousing, and analytics experience, she thrives for excellence.

Vishal Pathak is an AWS Data Lab Solutions Architect. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey with AWS, Vishal helped customers implement BI, data warehousing, and data lake projects in the US and Australia.

ETL orchestration using the Amazon Redshift Data API and AWS Step Functions with AWS SDK integration

Post Syndicated from Jason Pedreza original https://aws.amazon.com/blogs/big-data/etl-orchestration-using-the-amazon-redshift-data-api-and-aws-step-functions-with-aws-sdk-integration/

Extract, transform, and load (ETL) serverless orchestration architecture applications are becoming popular with many customers. These applications offers greater extensibility and simplicity, making it easier to maintain and simplify ETL pipelines. A primary benefit of this architecture is that we simplify an existing ETL pipeline with AWS Step Functions and directly call the Amazon Redshift Data API from the state machine. As a result, the complexity for the ETL pipeline is reduced.

As a data engineer or an application developer, you may want to interact with Amazon Redshift to load or query data with a simple API endpoint without having to manage persistent connections. The Amazon Redshift Data API allows you to interact with Amazon Redshift without having to configure JDBC or ODBC connections. This feature allows you to orchestrate serverless data processing workflows, design event-driven web applications, and run an ETL pipeline asynchronously to ingest and process data in Amazon Redshift, with the use of Step Functions to orchestrate the entire ETL or ELT workflow.

This post explains how to use Step Functions and the Amazon Redshift Data API to orchestrate the different steps in your ETL or ELT workflow and process data into an Amazon Redshift data warehouse.

AWS Lambda is typically used with Step Functions due to its flexible and scalable compute benefits. An ETL workflow has multiple steps, and the complexity may vary within each step. However, there is an alternative approach with AWS SDK service integrations, a feature of Step Functions. These integrations allow you to call over 200 AWS services’ API actions directly from your state machine. This approach is optimal for steps with relatively low complexity compared to using Lambda because you no longer have to maintain and test function code. Lambda functions have a maximum timeout of 15 minutes; if you need to wait for longer-running processes, Step Functions standard workflows allows a maximum runtime of 1 year.

You can replace steps that include a single process with a direct integration between Step Functions and AWS SDK service integrations without using Lambda. For example, if a step is only used to call a Lambda function that runs a SQL statement in Amazon Redshift, you may remove the Lambda function with a direct integration to the Amazon Redshift Data API’s SDK API action. You can also decouple Lambda functions with multiple actions into multiple steps. An implementation of this is available later in this post.

We created an example use case in the GitHub repo ETL Orchestration using Amazon Redshift Data API and AWS Step Functions that provides an AWS CloudFormation template for setup, SQL scripts, and a state machine definition. The state machine directly reads SQL scripts stored in your Amazon Simple Storage Service (Amazon S3) bucket, runs them in your Amazon Redshift cluster, and performs an ETL workflow. We don’t use Lambda in this use case.

Solution overview

In this scenario, we simplify an existing ETL pipeline that uses Lambda to call the Data API. AWS SDK service integrations with Step Functions allow you to directly call the Data API from the state machine, reducing the complexity in running the ETL pipeline.

The entire workflow performs the following steps:

  1. Set up the required database objects and generate a set of sample data to be processed.
  2. Run two dimension jobs that perform SCD1 and SCD2 dimension load, respectively.
  3. When both jobs have run successfully, the load job for the fact table runs.
  4. The state machine performs a validation to ensure the sales data was loaded successfully.

The following architecture diagram highlights the end-to-end solution:

We run the state machine via the Step Functions console, but you can run this solution in several ways:

You can deploy the solution with the provided CloudFormation template, which creates the following resources:

  • Database objects in the Amazon Redshift cluster:
    • Four stored procedures:
      • sp_setup_sales_data_pipeline() – Creates the tables and populates them with sample data
      • sp_load_dim_customer_address() – Runs the SCD1 process on customer_address records
      • sp_load_dim_item() – Runs the SCD2 process on item records
      • sp_load_fact_sales (p_run_date date) – Processes sales from all stores for a given day
    • Five Amazon Redshift tables:
      • customer
      • customer_address
      • date_dim
      • item
      • store_sales
  • The AWS Identity and Access Management (IAM) role StateMachineExecutionRole for Step Functions to allow the following permissions:
    • Federate to the Amazon Redshift cluster through getClusterCredentials permission avoiding password credentials
    • Run queries in the Amazon Redshift cluster through Data API calls
    • List and retrieve objects from Amazon S3
  • The Step Functions state machine RedshiftETLStepFunction, which contains the steps used to run the ETL workflow of the sample sales data pipeline

Prerequisites

As a prerequisite for deploying the solution, you need to set up an Amazon Redshift cluster and associate it with an IAM role. For more information, see Authorizing Amazon Redshift to access other AWS services on your behalf. If you don’t have a cluster provisioned in your AWS account, refer to Getting started with Amazon Redshift for instructions to set it up.

When the Amazon Redshift cluster is available, perform the following steps:

  1. Download and save the CloudFormation template to a local folder on your computer.
  2. Download and save the following SQL scripts to a local folder on your computer:
    1. sp_statements.sql – Contains the stored procedures including DDL and DML operations.
    2. validate_sql_statement.sql – Contains two validation queries you can run.
  3. Upload the SQL scripts to your S3 bucket. The bucket name is the designated S3 bucket specified in the ETLScriptS3Path input parameter.
  4. On the AWS CloudFormation console, choose Create stack with new resources and upload the template file you downloaded in the previous step (etl-orchestration-with-stepfunctions-and-redshift-data-api.yaml).
  5. Enter the required parameters and choose Next.
  6. Choose Next until you get to the Review page and select the acknowledgement check box.
  7. Choose Create stack.
  8. Wait until the stack deploys successfully.

When the stack is complete, you can view the outputs, as shown in the following screenshot:

Run the ETL orchestration

After you deploy the CloudFormation template, navigate to the stack detail page. On the Resources tab, choose the link for RedshiftETLStepFunction to be redirected to the Step Functions console.

The RedshiftETLStepFunction state machine runs automatically, as outlined in the following workflow:

  1. read_sp_statement and run_sp_deploy_redshift – Performs the following actions:
    1. Retrieves the sp_statements.sql from Amazon S3 to get the stored procedure.
    2. Passes the stored procedure to the batch-execute-statement API to run in the Amazon Redshift cluster.
    3. Sends back the identifier of the SQL statement to the state machine.
  2. wait_on_sp_deploy_redshift – Waits for at least 5 seconds.
  3. run_sp_deploy_redshift_status_check – Invokes the Data API’s describeStatement to get the status of the API call.
  4. is_run_sp_deploy_complete – Routes the next step of the ETL workflow depending on its status:
    1. FINISHED – Stored procedures are created in your Amazon Redshift cluster.
    2. FAILED – Go to the sales_data_pipeline_failure step and fail the ETL workflow.
    3. All other status – Go back to the wait_on_sp_deploy_redshift step to wait for the SQL statements to finish.
  5. setup_sales_data_pipeline – Performs the following steps:
    1. Initiates the setup stored procedure that was previously created in the Amazon Redshift cluster.
    2. Sends back the identifier of the SQL statement to the state machine.
  6. wait_on_setup_sales_data_pipeline – Waits for at least 5 seconds.
  7. setup_sales_data_pipeline_status_check – Invokes the Data API’s describeStatement to get the status of the API call.
  8. is_setup_sales_data_pipeline_complete – Routes the next step of the ETL workflow depending on its status:
    1. FINISHED – Created two dimension tables (customer_address and item) and one fact table (sales).
    2. FAILED – Go to the sales_data_pipeline_failure step and fail the ETL workflow.
    3. All other status – Go back to the wait_on_setup_sales_data_pipeline step to wait for the SQL statements to finish.
  9. run_sales_data_pipeline LoadItemTable and LoadCustomerAddressTable are two parallel workflows that Step Functions runs at the same time. The workflows run the stored procedures that were previously created. The stored procedure loads the data into the item and customer_address tables. All other steps in the parallel sessions follow the same concept as described previously. When both parallel workflows are complete, run_load_fact_sales runs.
  10. run_load_fact_sales – Inserts data into the store_sales table that was created in the initial stored procedure.
  11. Validation – When all the ETL steps are complete, the state machine reads a second SQL file from Amazon S3 (validate_sql_statement.sql) and runs the two SQL statements using the batch_execute_statement method.

The implementation of the ETL workflow is idempotent. If it fails, you can retry the job without any cleanup. For example, it recreates the stg_store_sales table each time, then deletes the target table store_sales with the data for the particular refresh date each time.

The following diagram illustrates the state machine workflow:

In this example, we use the task state resource arn:aws:states:::aws-sdk:redshiftdata:[apiAction] to call the corresponding Data API action. The following table summarizes the Data API actions and their corresponding AWS SDK integration API actions.

Amazon Redshift Data API Actions AWS SDK Integrations API Actions
BatchExecuteStatement batchExecuteStatement
ExecuteStatement executeStatement
DescribeStatement describeStatement
CancelStatement cancelStatement
GetStatementResult getStatementResult
DescribeTable describeTable
ListDatabases listDatabases
ListSchemas listSchemas
ListStatements listStatements
ListTables listTables

To use AWS SDK integrations, you specify the service name and API call, and, optionally, a service integration pattern. The AWS SDK action is always camel case, and parameter names are Pascal case. For example, you can use the Step Functions action batchExecuteStatement to run multiple SQL statements in a batch as a part of a single transaction on the Data API. The SQL statements can be SELECT, DML, DDL, COPY, and UNLOAD.

Validate the ETL orchestration

The entire ETL workflow takes approximately 1 minute to run. The following screenshot shows that the ETL workflow completed successfully.

When the entire sales data pipeline is complete, you may go through the entire execution event history, as shown in the following screenshot.

Schedule the ETL orchestration

After you validate the sales data pipeline, you may opt to run the data pipeline on a daily schedule. You can accomplish this with Amazon EventBridge.

  1. On the EventBridge console, create a rule to run the RedshiftETLStepFunction state machine daily.
  2. To invoke the RedshiftETLStepFunction state machine on a schedule, choose Schedule and define the appropriate frequency needed to run the sales data pipeline.
  3. Specify the target state machine as RedshiftETLStepFunction and choose Create.

You can confirm the schedule on the rule details page.

Clean up

Clean up the resources created by the CloudFormation template to avoid unnecessary cost to your AWS account. You can delete the CloudFormation stack by selecting the stack on the AWS CloudFormation console and choosing Delete. This action deletes all the resources it provisioned. If you manually updated a template-provisioned resource, you may see some issues during cleanup; you need to clean these up independently.

Limitations

The Data API and Step Functions AWS SDK integration offers a robust mechanism to build highly distributed ETL applications within minimal developer overhead. Consider the following limitations when using the Data API and Step Functions:

Conclusion

In this post, we demonstrated how to build an ETL orchestration using the Amazon Redshift Data API and Step Functions with AWS SDK integration.

To learn more about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters and Using the Amazon Redshift Data API.


About the Authors

Jason Pedreza is an Analytics Specialist Solutions Architect at AWS with over 13 years of data warehousing experience. Prior to AWS, he built data warehouse solutions at Amazon.com. He specializes in Amazon Redshift and helps customers build scalable analytic solutions.

Bipin Pandey is a Data Architect at AWS. He loves to build data lake and analytics platforms for his customers. He is passionate about automating and simplifying customer problems with the use of cloud solutions.

David Zhang is an AWS Solutions Architect who helps customers design robust, scalable, and data-driven solutions across multiple industries. With a background in software development, David is an active leader and contributor to AWS open-source initiatives. He is passionate about solving real-world business problems and continuously strives to work from the customer’s perspective. Feel free to connect with him on LinkedIn.

Doing more with less: Moving from transactional to stateful batch processing

Post Syndicated from Tom Jin original https://aws.amazon.com/blogs/big-data/doing-more-with-less-moving-from-transactional-to-stateful-batch-processing/

Amazon processes hundreds of millions of financial transactions each day, including accounts receivable, accounts payable, royalties, amortizations, and remittances, from over a hundred different business entities. All of this data is sent to the eCommerce Financial Integration (eCFI) systems, where they are recorded in the subledger.

Ensuring complete financial reconciliation at this scale is critical to day-to-day accounting operations. With transaction volumes exhibiting double-digit percentage growth each year, we found that our legacy transactional-based financial reconciliation architecture proved too expensive to scale and lacked the right level of visibility for our operational needs.

In this post, we show you how we migrated to a batch processing system, built on AWS, that consumes time-bounded batches of events. This not only reduced costs by almost 90%, but also improved visibility into our end-to-end processing flow. The code used for this post is available on GitHub.

Legacy architecture

Our legacy architecture primarily utilized Amazon Elastic Compute Cloud (Amazon EC2) to group related financial events into stateful artifacts. However, a stateful artifact could refer to any persistent artifact, such as a database entry or an Amazon Simple Storage Service (Amazon S3) object.

We found this approach resulted in deficiencies in the following areas:

  • Cost – Individually storing hundreds of millions of financial events per day in Amazon S3 resulted in high I/O and Amazon EC2 compute resource costs.
  • Data completeness – Different events flowed through the system at different speeds. For instance, while a small stateful artifact for a single customer order could be recorded in a couple of seconds, the stateful artifact for a bulk shipment containing a million lines might require several hours to update fully. This made it difficult to know whether all the data had been processed for a given time range.
  • Complex retry mechanisms – Financial events were passed between legacy systems using individual network calls, wrapped in a backoff retry strategy. Still, network timeouts, throttling, or traffic spikes could result in some events erroring out. This required us to build a separate service to sideline, manage, and retry problematic events at a later date.
  • Scalability – Bottlenecks occurred when different events competed to update the same stateful artifact. This resulted in excessive retries or redundant updates, making it less cost-effective as the system grew.
  • Operational support – Using dedicated EC2 instances meant that we needed to take valuable development time to manage OS patching, handle host failures, and schedule deployments.

The following diagram illustrates our legacy architecture.

Transactional-based legacy architecture

Evolution is key

Our new architecture needed to address the deficiencies while preserving the core goal of our service: update stateful artifacts based on incoming financial events. In our case, a stateful artifact refers to a group of related financial transactions used for reconciliation. We considered the following as part of the evolution of our stack:

  • Stateless and stateful separation
  • Minimized end-to-end latency
  • Scalability

Stateless and stateful separation

In our transactional system, each ingested event results in an update to a stateful artifact. This became a problem when thousands of events came in all at once for the same stateful artifact.

However, by ingesting batches of data, we had the opportunity to create separate stateless and stateful processing components. The stateless component performs an initial reduce operation on the input batch to group together related events. This meant that the rest of our system could operate on these smaller stateless artifacts and perform fewer write operations (fewer operations means lower costs).

The stateful component would then join these stateless artifacts with existing stateful artifacts to produce an updated stateful artifact.

As an example, imagine an online retailer suddenly received thousands of purchases for a popular item. Instead of updating an item database entry thousands of times, we can first produce a single stateless artifact that summaries the latest purchases. The item entry can now be updated one time with the stateless artifact, reducing the update bottleneck. The following diagram illustrates this process.

Batch visualization

Minimized end-to-end latency

Unlike traditional extract, transform, and load (ETL) jobs, we didn’t want to perform daily or even hourly extracts. Our accountants need to be able to access the updated stateful artifacts within minutes of data arriving in our system. For instance, if they had manually sent a correction line, they wanted to be able to check within the same hour that their adjustment had the intended effect on the targeted stateful artifact instead of waiting until the next day. As such, we focused on parallelizing the incoming batches of data as much as possible by breaking down the individual tasks of the stateful component into subcomponents. Each subcomponent could run independently of each other, which allowed us to process multiple batches in an assembly line format.

Scalability

Both the stateless and stateful components needed to respond to shifting traffic patterns and possible input batch backlogs. We also wanted to incorporate serverless compute to better respond to scale while reducing the overhead of maintaining an instance fleet.

This meant we couldn’t simply have a one-to-one mapping between the input batch and stateless artifact. Instead, we built flexibility into our service so the stateless component could automatically detect a backlog of input batches and group multiple input batches together in one job. Similar backlog management logic was applied to the stateful component. The following diagram illustrates this process.

Batch scalability

Current architecture

To meet our needs, we combined multiple AWS products:

  • AWS Step Functions – Orchestration of our stateless and stateful workflows
  • Amazon EMR – Apache Spark operations on our stateless and stateful artifacts
  • AWS Lambda – Stateful artifact indexing and orchestration backlog management
  • Amazon ElastiCache – Optimizing Amazon S3 request latency
  • Amazon S3 – Scalable storage of our stateless and stateful artifacts
  • Amazon DynamoDB – Stateless and stateful artifact index

The following diagram illustrates our current architecture.

Current architecture

The following diagram shows our stateless and stateful workflow.

Flowchart

The AWS CloudFormation template to render this architecture and corresponding Java code is available in the following GitHub repo.

Stateless workflow

We used an Apache Spark application on a long-running Amazon EMR cluster to simultaneously ingest input batch data and perform reduce operations to produce the stateless artifacts and a corresponding index file for the stateful processing to use.

We chose Amazon EMR for its proven highly available data-processing capability in a production setting and also its ability to horizontally scale when we see increased traffic loads. Most importantly, Amazon EMR had lower cost and better operational support when compared to a self-managed cluster.

Stateful workflow

Each stateful workflow performs operations to create or update millions of stateful artifacts using the stateless artifacts. Similar to the stateless workflows, all stateful artifacts are stored in Amazon S3 across a handful of Apache Spark part-files. This alone resulted in a huge cost reduction, because we significantly reduced the number of Amazon S3 writes (while using the same amount of overall storage). For instance, storing 10 million individual artifacts using the transactional legacy architecture would cost $50 in PUT requests alone, whereas 10 Apache Spark part-files would cost only $0.00005 in PUT requests (based on $0.005 per 1,000 requests).

However, we still needed a way to retrieve individual stateful artifacts, because any stateful artifact could be updated at any point in the future. To do this, we turned to DynamoDB. DynamoDB is a fully managed and scalable key-value and document database. It’s ideal for our access pattern because we wanted to index the location of each stateful artifact in the stateful output file using its unique identifier as a primary key. We used DynamoDB to index the location of each stateful artifact within the stateful output file. For instance, if our artifact represented orders, we would use the order ID (which has high cardinality) as the partition key, and store the file location, byte offset, and byte length of each order as separate attributes. By passing the byte-range in Amazon S3 GET requests, we can now fetch individual stateful artifacts as if they were stored independently. We were less concerned about optimizing the number of Amazon S3 GET requests because the GET requests are over 10 times cheaper than PUT requests.

Overall, this stateful logic was split across three serial subcomponents, which meant that three separate stateful workflows could be operating at any given time.

Pre-fetcher

The following diagram illustrates our pre-fetcher subcomponent.

Prefetcher architecture

The pre-fetcher subcomponent uses the stateless index file to retrieve pre-existing stateful artifacts that should be updated. These might be previous shipments for the same customer order, or past inventory movements for the same warehouse. For this, we turn once again to Amazon EMR to perform this high-throughput fetch operation.

Each fetch required a DynamoDB lookup and an Amazon S3 GET partial byte-range request. Due to the large number of external calls, fetches were highly parallelized using a thread pool contained within an Apache Spark flatMap operation. Pre-fetched stateful artifacts were consolidated into an output file that was later used as input to the stateful processing engine.

Stateful processing engine

The following diagram illustrates the stateful processing engine.

Stateful processor architecture

The stateful processing engine subcomponent joins the pre-fetched stateful artifacts with the stateless artifacts to produce updated stateful artifacts after applying custom business logic. The updated stateful artifacts are written out across multiple Apache Spark part-files.

Because stateful artifacts could have been indexed at the same time that they were pre-fetched (also called in-flight updates), the stateful processor also joins recently processed Apache Spark part-files.

We again used Amazon EMR here to take advantage of the Apache Spark operations that are required to join the stateless and stateful artifacts.

State indexer

The following diagram illustrates the state indexer.

State Indexer architecture

This Lambda-based subcomponent records the location of each stateful artifact within the stateful part-file in DynamoDB. The state indexer also caches the stateful artifacts in an Amazon ElastiCache for Redis cluster to provide a performance boost in the Amazon S3 GET requests performed by the pre-fetcher.

However, even with a thread pool, a single Lambda function isn’t powerful enough to index millions of stateful artifacts within the 15-minute time limit. Instead, we employ a cluster of Lambda functions. The state indexer begins with a single coordinator Lambda function, which determines the number of worker functions that are needed. For instance, if 100 part-files are generated by the stateful processing engine, then the coordinator might assign five part-files for each of the 20 Lambda worker functions to work on. This method is highly scalable because we can dynamically assign more or fewer Lambda workers as required.

Each Lambda worker then performs the ElastiCache and DynamoDB writes for all the stateful artifacts within each assigned part-file in a multi-threaded manner. The coordinator function monitors the health of each Lambda worker and restarts workers as needed.

Distributed Lambda architecture

Orchestration

We used Step Functions to coordinate each of the stateless and stateful workflows, as shown in the following diagram.

Step Function Workflow

Every time a new workflow step ran, the step was recorded in a DynamoDB table via a Lambda function. This table not only maintained the order in which stateful batches should be run, but it also formed the basis of the backlog management system, which directed the stateless ingestion engine to group more or fewer input batches together depending on the backlog.

We chose Step Functions for its native integration with many AWS services (including triggering by an Amazon CloudWatch scheduled event rule and adding Amazon EMR steps) and its built-in support for backoff retries and complex state machine logic. For instance, we defined different backoff retry rates based on the type of error.

Conclusion

Our batch-based architecture helped us overcome the transactional processing limitations we originally set out to resolve:

  • Reduced cost – We have been able to scale to thousands of workflows and hundreds of million events per day using only three or four core nodes per EMR cluster. This reduced our Amazon EC2 usage by over 90% when compared with a similar transactional system. Additionally, writing out batches instead of individual transactions reduced the number of Amazon S3 PUT requests by over 99.8%.
  • Data completeness guarantees – Because each input batch is associated with a time interval, when a batch has finished processing, we know that all events in that time interval have been completed.
  • Simplified retry mechanisms – Batch processing means that failures occur at the batch level and can be retried directly through the workflow. Because there are far fewer batches than transactions, batch retries are much more manageable. For instance, in our service, a typical batch contains about two million entries. During a service outage, only a single batch needs to be retried, as opposed to two million individual entries in the legacy architecture.
  • High scalability – We’ve been impressed with how easy it is to scale our EMR clusters on the fly if we detect an increase in traffic. Using Amazon EMR instance fleets also helps us automatically choose the most cost-effective instances across different Availability Zones. We also like the performance achieved by our Lambda-based state indexer. This subcomponent not only dynamically scales with no human intervention, but has also been surprisingly cost-efficient. A large portion of our usage has fallen within the free tier.
  • Operational excellence – Replacing traditional hosts with serverless components such as Lambda allowed us to spend less time on compliance tickets and focus more on delivering features for our customers.

We are particularly excited about the investments we have made moving from a transactional-based system to a batch processing system, especially our shift from using Amazon EC2 to using serverless Lambda and big data Amazon EMR services. This experience demonstrates that even services originally built on AWS can still achieve cost reductions and improve performance by rethinking how AWS services are used.

Inspired by our progress, our team is moving to replace many other legacy services with serverless components. Likewise, we hope that other engineering teams can learn from our experience, continue to innovate, and do more with less.

Find the code used for this post in the following GitHub repository.

Special thanks to development team: Ryan Schwartz, Abhishek Sahay, Cecilia Cho, Godot Bian, Sam Lam, Jean-Christophe Libbrecht, and Nicholas Leong.


About the Authors


Tom Jin is a Senior Software Engineer for eCommerce Financial Integration (eCFI) at Amazon. His interests include building large-scale systems and applying machine learning to healthcare applications. He is based in Vancouver, Canada and is a fan of ocean conservation.

Karthik Odapally is a Senior Solutions Architect at AWS supporting our Gaming Customers. He loves presenting at external conferences like AWS Re:Invent, and helping customers learn about AWS. His passion outside of work is to bake cookies and bread for family and friends here in the PNW. In his spare time, he plays Legend of Zelda (Link’s Awakening) with his 4 yr old daughter.

Mocking service integrations with AWS Step Functions Local

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/mocking-service-integrations-with-aws-step-functions-local/

This post is written by Sam Dengler, Principal Specialist Solutions Architect, and Dhiraj Mahapatro, Senior Specialist Solutions Architect.

AWS Step Functions now supports over 200 AWS Service integrations via AWS SDK Integration. Developers want to build and test control flow logic for workflows using branching logicerror handling, and retries. This allows for precise workflow execution with deterministic results. Additionally, developers use Step Functions’ input and output processing features to transform data as it enters and exits tasks.

Developers can test their state machines locally using Step Functions Local before deploying them to an AWS account. However state machines that use service integrations like AWS Lambda, Amazon SQS, or Amazon SNS require Step Functions Local to perform calls to AWS service endpoints. Often, developers want to test the control and data flows of their state machine executions in isolation, without any dependency on service integration availability.

Today, AWS is releasing Mocked Service Integrations for Step Functions Local. This allows developers to define sample outputs from AWS service integrations. You can combine them into test case scenarios to validate workflow control and data flow definitions. You can find the code used in this post in the Step Functions examples GitHub repository.

Sales lead generation sample workflow

In this example, new sales leads are created in a customer relationship management system. This triggers the sample workflow execution using input data, which provides information about the contact.

Using the sales lead data, the workflow first validates the contact’s identity and address. If valid, it uses Step Functions’ AWS SDK integration for Amazon Comprehend to call the DetectSentiment API. It uses the sales lead’s comments as input for sentiment analysis.

If the comments have a positive sentiment, it adds the sales leads information to a DynamoDB table for follow-up. The event is published to Amazon EventBridge to notify subscribers.

If the sales lead data is invalid or a negative sentiment is detected, it publishes events to EventBridge for notification. No record is added to the Amazon DynamoDB table. The following Step Functions Workflow Studio diagram shows the control logic:

The full workflow definition is available in the code repository. Note the workflow task names in the diagram, such as DetectSentiment, which are important when defining the mocked responses.

Sentiment analysis test case

In this example, you test a scenario in which:

  1. The identity and address are successfully validated using a Lambda function.
  2. A positive sentiment is detected using the Comprehend.DetectSentiment API after three retries.
  3. A contact item is written to a DynamoDB table successfully
  4. An event is published to an EventBridge event bus successfully

The execution path for this test scenario is shown in the following diagram (the red and green numbers have been added). 0 represents the first execution; 1, 2, and 3 represent the max retry attempts (MaxAttempts), in case of an InternalServerException.

Mocked response configuration

To use service integration mocking, create a mock configuration file with sections specifying mock AWS service responses. These are grouped into test cases that can be activated when executing state machines locally. The following example provides code snippets and the full mock configuration is available in the code repository.

To mock a successful Lambda function invocation, define a mock response that conforms to the Lambda.Invoke API response elements. Associate it to the first request attempt:

"CheckIdentityLambdaMockedSuccess": {
  "0": {
    "Return": {
      "StatusCode": 200,
      "Payload": {
        "statusCode": 200,
        "body": "{\"approved\":true,\"message\":\"identity validation passed\"
}"
      }
    }
  }
}

To mock the DetectSentiment retry behavior, define failure and successful mock responses that conform to the Comprehend.DetectSentiment API call. Associate the failure mocks to three request attempts, and associate the successful mock to the fourth attempt:

"DetectSentimentRetryOnErrorWithSuccess": {
  "0-2": {
    "Throw": {
      "Error": "InternalServerException",
      "Cause": "Server Exception while calling DetectSentiment API in Comprehend Service"
    }
  },
  "3": {
    "Return": {
      "Sentiment": "POSITIVE",
      "SentimentScore": {
        "Mixed": 0.00012647535,
        "Negative": 0.00008031699,
        "Neutral": 0.0051454515,
        "Positive": 0.9946478
      }
    }
  }
}

Note that Step Functions Local does not validate the structure of the mocked responses. Ensure that your mocked responses conform to actual responses before testing. To review the structure of service responses, either perform the actual service calls using Step Functions or view the documentation for those services.

Next, associate the mocked responses to a test case identifier:

"RetryOnServiceExceptionTest": {
  "Check Identity": "CheckIdentityLambdaMockedSuccess",
  "Check Address": "CheckAddressLambdaMockedSuccess",
  "DetectSentiment": "DetectSentimentRetryOnErrorWithSuccess",
  "Add to FollowUp": "AddToFollowUpSuccess",
  "CustomerAddedToFollowup": "CustomerAddedToFollowupSuccess"
}

With the test case and mock responses configured, you can use them for testing with Step Functions Local.

Test case execution using Step Functions Local

The Step Functions Developer Guide describes the steps used to set up Step Functions Local on your workstation and create a state machine.

After these steps are complete, you can run a workflow locally using the start-execution AWS CLI command. Activate the mocked responses by appending a pound sign and the test case identifier to the state machine ARN:

aws stepfunctions start-execution \
  --endpoint http://localhost:8083 \
  --state-machine arn:aws:states:us-east-1:123456789012:stateMachine: LeadGenerationStateMachine#RetryOnServiceExceptionTest \
  --input file://events/sfn_valid_input.json

Test case validation

To validate the workflow executed correctly in the test case, examine the state machine execution events using the StepFunctions.GetExecutionHistory API. This ensures that the correct states are used. There are a variety of validation tools available. This post shows how to achieve this using the AWS CLI filtering feature using JMESPath syntax.

In this test case, you validate the TaskFailed and TaskSucceeded events match the retry definition for the DetectSentiment task, which specifies three retries. Use the following AWS CLI command to get the execution history and filter on the execution events:

aws stepfunctions get-execution-history \
  --endpoint http://localhost:8083 \
  --execution-arn <ExecutionArn>
  --query 'events[?(type==`TaskFailed` && contains(taskFailedEventDetails.cause, `Server Exception while calling DetectSentiment API in Comprehend Service`)) || (type==`TaskSucceeded` && taskSucceededEventDetails.resource==`comprehend:detectSentiment`)]'

The results include matching events:

{
  "timestamp": "2022-01-13T17:24:32.276000-05:00",
  "type": "TaskFailed",
  "id": 19,
  "previousEventId": 18,
  "taskFailedEventDetails": {
    "error": "InternalServerException",
    "cause": "Server Exception while calling DetectSentiment API in Comprehend Service"
  }
}

These results should be compared to the test acceptance criteria to verify the execution behavior. Test cases, acceptance criteria, and validation expressions vary by customer and use case. These techniques are flexible to accommodate various happy path and error scenarios. To explore additional sample test cases and examples, visit the example code repository.

Conclusion

This post introduces a new robust way to test AWS Step Functions state machines in isolation. With mocking, developers get more control over the type of scenarios that a state machine can handle, leading to assertion of multiple behaviors. Testing a state machine with mocks can also be part of the software release. Asserting on behaviors like error handling, branching, parallel, dynamic parallel (map state) helps test the entire state machine’s behavior. For any new behavior in the state machine, such as a new type of exception from a state, you can mock and add as a test.

See the Step Functions Developer Guide for more information on service mocking with Step Functions Local. The sample application covers basic scenarios of testing a state machine. You can use a similar approach for complex scenarios including other Step Functions flows, like map and wait.

For more serverless learning resources, visit Serverless Land.

Using the circuit breaker pattern with AWS Step Functions and Amazon DynamoDB

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/using-the-circuit-breaker-pattern-with-aws-step-functions-and-amazon-dynamodb/

This post is written by Anitha Deenadayalan, Developer Specialist SA, DevAx

Modern applications use microservices as an architectural and organizational approach to software development, where the application comprises small independent services that communicate over well-defined APIs.

When multiple microservices collaborate to handle requests, one or more services may become unavailable or exhibit a high latency. Microservices communicate through remote procedure calls, and it is always possible that transient errors could occur in the network connectivity, causing failures.

This can cause performance degradation in the entire application during synchronous execution because of the cascading of timeouts or failures causing poor user experience. When complex applications use microservices, an outage in one microservice can lead to application failure. This post shows how to use the circuit breaker design pattern to help with a graceful service degradation.

Introducing circuit breakers

Michael Nygard popularized the circuit breaker pattern in his book, Release It. This design pattern can prevent a caller service from retrying another callee service call that has previously caused repeated timeouts or failures. It can also detect when the callee service is functional again.

Fallacies of distributed computing are a set of assertions made by Peter Deutsch and others at Sun Microsystems. They say the programmers new to distributed applications invariably make false assumptions. The network reliability, zero-latency expectations, and bandwidth limitations result in software applications written with minimal error handling for network errors.

During a network outage, applications may indefinitely wait for a reply and continually consume application resources. Failure to retry the operations when the network becomes available can also lead to application degradation. If API calls to a database or an external service time-out due to network issues, repeated calls with no circuit breaker can affect cost and performance.

The circuit breaker pattern

There is a circuit breaker object that routes the calls from the caller to the callee in the circuit breaker pattern. For example, in an ecommerce application, the order service can call the payment service to collect the payments. When there are no failures, the order service routes all calls to the payment service by the circuit breaker:

Circuit breaker with no failures

Circuit breaker with no failures

If the payment service times out, the circuit breaker can detect the timeout and track the failure. If the timeouts exceed a specified threshold, the application opens the circuit:

Circuit breaker with payment service failure

Circuit breaker with payment service failure

Once the circuit is open, the circuit breaker object does not route the calls to the payment service. It returns an immediate failure when the order service calls the payment service:

Circuit breaker stops routing to payment service

Circuit breaker stops routing to payment service

The circuit breaker object periodically tries to see if the calls to the payment service are successful:

Circuit breaker retries payment service

Circuit breaker retries payment service

When the call to payment service succeeds, the circuit is closed, and all further calls are routed to the payment service again:

Circuit breaker with working payment service again

Circuit breaker with working payment service again

Architecture overview

This example uses the AWS Step Functions, AWS Lambda, and Amazon DynamoDB to implement the circuit breaker pattern:

Circuit breaker architecture

Circuit breaker architecture

The Step Functions workflow provides circuit breaker capabilities. When a service wants to call another service, it starts the workflow with the name of the callee service.

The workflow gets the circuit status from the CircuitStatus DynamoDB table, which stores the currently degraded services. If the CircuitStatus contains a record for the service called, then the circuit is open. The Step Functions workflow returns an immediate failure and exit with a FAIL state.

If the CircuitStatus table does not contain an item for the called service, then the service is operational. The ExecuteLambda step in the state machine definition invokes the Lambda function sent through a parameter value. The Step Functions workflow exits with a SUCCESS state, if the call succeeds.

The items in the DynamoDB table have the following attributes:

DynamoDB items list

DynamoDB items list

If the service call fails or a timeout occurs, the application retries with exponential backoff for a defined number of times. If the service call fails after the retries, the workflow inserts a record in the CircuitStatus table for the service with the CircuitStatus as OPEN, and the workflow exits with a FAIL state. Subsequent calls to the same service return an immediate failure as long as the circuit is open.

I enter the item with an associated time-to-live (TTL) value to ensure eventual connection retries and the item expires at the defined TTL time. DynamoDB’s time to live (TTL) allows you to define a per-item timestamp to determine when an item is no longer needed. Shortly after the date and time of the specified timestamp, DynamoDB deletes the item from your table without consuming write throughput.

For example, if you set the TTL value to 60 seconds to check a service status after a minute, DynamoDB deletes the item from the table after 60 seconds. The workflow invokes the service to check for availability when a new call comes in after the item has expired.

Circuit breaker Step Function

Circuit breaker Step Function

Prerequisites

For this walkthrough, you need:

Setting up the environment

Use the .NET Core 3.1 code in the GitHub repository and the AWS SAM template to create the AWS resources for this walkthrough. These include IAM roles, DynamoDB table, the Step Functions workflow, and Lambda functions.

  1. You need an AWS access key ID and secret access key to configure the AWS Command Line Interface (AWS CLI). To learn more about configuring the AWS CLI, follow these instructions.
  2. Clone the repo:
    git clone https://github.com/aws-samples/circuit-breaker-netcore-blog
  3. After cloning, this is the folder structure:

    Project file structure

    Project file structure

Deploy using Serverless Application Model (AWS SAM)

The AWS Serverless Application Model (AWS SAM) CLI provides developers with a local tool for managing serverless applications on AWS.

  1. The sam build command processes your AWS SAM template file, application code, and applicable language-specific files and dependencies. The command copies build artifacts in the format and location expected for subsequent steps in your workflow. Run these commands to process the template file:
    cd circuit-breaker
    sam build
  2. After you build the application, test using the sam deploy command. AWS SAM deploys the application to AWS and displays the output in the terminal.
    sam deploy --guided

    Output from sam deploy

    Output from sam deploy

  3. You can also view the output in AWS CloudFormation page.

    Output in CloudFormation console

    Output in CloudFormation console

  4. The Step Functions workflow provides the circuit-breaker function. Refer to the circuitbreaker.asl.json file in the statemachine folder for the state machine definition in the Amazon States Language (ASL).

To deploy with the CDK, refer to the GitHub page.

Running the service through the circuit breaker

To provide circuit breaker capabilities to the Lambda microservice, you must send the name or function ARN of the Lambda function to the Step Functions workflow:

{
  "TargetLambda": "<Name or ARN of the Lambda function>"
}

Successful run

To simulate a successful run, use the HelloWorld Lambda function provided by passing the name or ARN of the Lambda function the stack has created. Your input appears as follows:

{
  "TargetLambda": "circuit-breaker-stack-HelloWorldFunction-pP1HNkJGugQz"
}

During the successful run, the Get Circuit Status step checks the circuit status against the DynamoDB table. Suppose that the circuit is CLOSED, which is indicated by zero records for that service in the DynamoDB table. In that case, the Execute Lambda step runs the Lambda function and exits the workflow successfully.

Step Function with closed circuit

Step Function with closed circuit

Service timeout

To simulate a timeout, use the TestCircuitBreaker Lambda function by passing the name or ARN of the Lambda function the stack has created. Your input appears as:

{
  "TargetLambda": "circuit-breaker-stack-TestCircuitBreakerFunction-mKeyyJq4BjQ7"
}

Again, the circuit status is checked against the DynamoDB table by the Get Circuit Status step in the workflow. The circuit is CLOSED during the first pass, and the Execute Lambda step runs the Lambda function and timeout.

The workflow retries based on the retry count and the exponential backoff values, and finally returns a timeout error. It runs the Update Circuit Status step where a record is inserted in the DynamoDB table for that service, with a predefined time-to-live value specified by TTL attribute ExpireTimeStamp.

Step Function with open circuit

Step Function with open circuit

Repeat timeout

As long as there is an item for the service in the DynamoDB table, the circuit breaker workflow returns an immediate failure to the calling service. When you re-execute the call to the Step Functions workflow for the TestCircuitBreaker Lambda function within 20 seconds, the circuit is still open. The workflow immediately fails, ensuring the stability of the overall application performance.

Step Function workflow immediately fails until retry

Step Function workflow immediately fails until retry

The item in the DynamoDB table expires after 20 seconds, and the workflow retries the service again. This time, the workflow retries with exponential backoffs, and if it succeeds, the workflow exits successfully.

Cleaning up

To avoid incurring additional charges, clean up all the created resources. Run the following command from a terminal window. This command deletes the created resources that are part of this example.

sam delete --stack-name circuit-breaker-stack --region <region name>

Conclusion

This post showed how to implement the circuit breaker pattern using Step Functions, Lambda, DynamoDB, and .NET Core 3.1. This pattern can help prevent system degradation in service failures or timeouts. Step Functions and the TTL feature of DynamoDB can make it easier to implement the circuit breaker capabilities.

To learn more about developing microservices on AWS, refer to the whitepaper on microservices. To learn more about serverless and AWS SAM, visit the Sessions with SAM series and find more resources at Serverless Land.