Tag Archives: AWS Step Functions

Orchestrating large-scale document processing with AWS Step Functions and Amazon Bedrock batch inference

Post Syndicated from Brian Zambrano original https://aws.amazon.com/blogs/compute/orchestrating-large-scale-document-processing-with-aws-step-functions-and-amazon-bedrock-batch-inference/

Organizations often have large volumes of documents containing valuable information that remains locked away and unsearchable. This solution addresses the need for a scalable, automated text extraction and knowledge base pipeline that transforms static document collections into intelligent, searchable repositories for generative AI applications.

Organizations can automate the extraction of both content and structured metadata to build comprehensive knowledge bases that power retrieval-augmented generation (RAG) solutions while significantly reducing manual processing costs and time-to-value. The architecture not only demonstrates the processing of 500 research papers automatically, but also scales to handle enterprise document volumes cost-effectively through the Amazon Bedrock batch inference pricing model.

Overview

Amazon Bedrock batch inference is a feature of Amazon Bedrock that offers a 50% discount on inference requests. Although Amazon Bedrock schedules and runs the batch job (needing a minimum of 100 inference requests) as capacity becomes available, the inference won’t be real-time. For use cases where you can accommodate minutes to hours of latency, Amazon Bedrock batch inference is a good option.

This post demonstrates how to build an automated, serverless pipeline using AWS Step Functions, Amazon Textract, Amazon Bedrock batch inference, and Amazon Bedrock Knowledge Bases to extract text, create metadata, and load it into a knowledge base at scale. The example solution processes 500 research papers in PDF format from Amazon Science, extracts text using Amazon Textract, generated structured metadata with Amazon Bedrock batch inference and the Amazon Nova Pro model, and loads the final output, including Amazon Bedrock Knowledge Base filter, into an Amazon Bedrock Knowledge Base.

Architecture

This solution uses Step Functions with parallel Amazon Textract job processing through child workflows run by Distributed Map. You can use the concurrency controls offered by Distributed Map to process documents as quickly as possible within your Amazon Textract quotas. Increasing processing speed necessitates adjusting your Amazon Textract quota and updating the Distributed Map configuration. Amazon Bedrock batch inference handles concurrency, scaling, and throttling. This means that you can create the job without managing these complexities.

In this example implementation, the solution processes research papers to extract metadata such as:

  • Code availability and repository locations
  • Dataset availability and access methods
  • Research methodology types
  • Reproducibility indicators
  • Other relevant research attributes

The high-level parts of this solution include:

  • Extracting text from PDF documents with Amazon Textract in parallel, through Step Functions Distributed Map.
  • Analyzing extracted text using Amazon Bedrock batch inference to extract structured metadata.
  • Loading extract text and metadata into a searchable knowledge base using Amazon Bedrock Knowledge Bases with Amazon OpenSearch Serverless.

Complete architecture diagram

Figure 1. Complete architecture diagram

Prerequisites

The following prerequisites are necessary to complete this solution:

Running the solution

The complete solution uses AWS CDK to implement two AWS CloudFormation stacks:

  1. BedrockKnowledgeBaseStack: Creates the knowledge base infrastructure
  2. SFNBatchInferenceStack: Implements the main processing workflow

First, clone the GitHub repository into your local development environment and install the requirements:

git clone https://github.com/aws-samples/sample-step-functions-batch-inference.git .

cd sample-step-functions-batch-inference

npm install

Next, deploy the solution using AWS CDK:

cdk deploy --all

After deploying the cdk stacks, upload your data sources (PDF files) into the AWS CDK-created Amazon S3 input bucket. In this example, I uploaded 500 Amazon Science papers. The input bucket name is included in the AWS CDK outputs:

Outputs:

SFNBatchInference.BatchInputBucketName = sfnbatchinference-batchinputbucket11aaa222-nrjki8tewwww

Parallel text extraction

The process begins when you upload a manifest.json file to the input bucket. The manifest file lists the files for processing, which already exist in the input bucket. The filenames listed in manifest.json define what constitutes a single processing job run. To create another run, you would create a different manifest.json and upload it to the same S3 bucket.

[
  {
    "filename": "flexecontrol-flexible-and-efficient-multimodal-control-for-text-to-image-generation.pdf"
  },
  {
    "filename": "adaptive-global-local-context-fusion-for-multi-turn-spoken-language-understanding.pdf"
  }
]

The AWS CDK definition for the input bucket includes Amazon EventBridge notifications and creates a rule that triggers the Step Functions workflow whenever a manifest.json file is uploaded.

private createS3Buckets() {
    const batchBucket = new s3.Bucket(this, "BatchInputBucket", {
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    })
    batchBucket.enableEventBridgeNotification()

    new cdk.CfnOutput(this, "BatchInputBucketName", {
      value: batchBucket.bucketName,
      description: "Name of input bucket to send PDF documents that Textract will read.",
    })

    const manifestFileCreatedRule = new eventBridge.Rule(this, "ManifestFileCreatedRule", {
      eventPattern: {
        source: ["aws.s3"],
        detailType: ["Object Created"],
        detail: {
          bucket: {
            name: [batchBucket.bucketName],
          },
          object: {
            key: ["manifest.json"],
          },
        },
      },
    })

    return { batchBucket, manifestFileCreatedRule }
  }

The first step in the Step Functions workflow is a Distributed Map run that performs the following actions for each PDF in the manifest file:

  1. Starts an Amazon Textract job, providing an Amazon Simple Notification Service (Amazon SNS) topic for completion notification.
  2. Writes the Step Functions task token to Amazon DynamoDB, pausing the individual child workflow.
  3. Processes the Amazon SNS message when the Amazon Textract job completes, triggering an AWS Lambda function.
  4. Uses a Lambda function to retrieve the task token from DynamoDB using the Amazon Textract JobId.
  5. Fetches the raw results from Amazon Textract, organizes the text for readability, and writes results to an S3 bucket

First step in the Step Functions workflow

A key component of this architecture is the callback pattern that Amazon Textract supports using the NotificationChannel option, as shown in the preceding figure. The AWS CDK definition the Step Functions state that starts the Amazon Textract job is shown in the following.

const startTextractStep = new tasks.CallAwsService(this, "StartTextractJob", {
  service: "textract",
  action: "startDocumentAnalysis",
  resultPath: "$.textractOutput",
  parameters: {
    DocumentLocation: {
      S3Object: {
        Bucket: sourceBucket.bucketName,
        Name: sfn.JsonPath.stringAt("$.filename"),
      },
    },
    FeatureTypes: ["LAYOUT"],
    NotificationChannel: {
      RoleArn: textractRoleArn,
      SnsTopicArn: snsTopicArn,
    },
  },
  iamResources: ["*"],
})

The Lambda function that handles task tokens extracts the Amazon Textract JobId from the Amazon SNS message, fetches the TaskToken from DynamoDB, and resumes the Step Functions Workflow by sending the TaskToken:

from aws_lambda_powertools.utilities.data_classes import SNSEvent, event_source

@event_source(data_class=SNSEvent)
def handle_textract_task_complete(event, context):
    # Multiple records can be delivered in a single event
    for record in event.records:
        sns_message = json.loads(record.sns.message)
        textract_job_id = sns_message["JobId"]

        # Get both task token and original file from DynamoDB
        ddb_item = _get_item_from_ddb(textract_job_id)

        # Send both the job ID and original file name in the response
        _send_task_success(
            ddb_item["TaskToken"],
            {
                "TextractJobId": textract_job_id,
                "OriginalFile": ddb_item["OriginalFile"],
            },
        )
        # Delete the task token from DynamoDB after use
        _delete_item_from_ddb(textract_job_id)

def _send_task_success(task_token: str, output: None | dict = None) -> None:
    """Sends task success to Step Functions with the provided output"""
    sfn = boto3.client("stepfunctions")
    sfn.send_task_success(taskToken=task_token, output=json.dumps(output or {}))

The Distributed Map runs up to 10 child workflows concurrently, controlled by the maxConcurrency setting. Although Step Functions supports running up to 10,000 child workflow executions, the practical concurrency for this solution is constrained by Amazon Textract quotas. The startDocumentAnalysis API has a default quota of 10 requests per second (RPS), which means you must consider this limit when scaling your document processing workloads and potentially request quota increases for higher throughput requirements.

const distributedMap = new sfn.DistributedMap(this, "DistributedMap", {
  mapExecutionType: sfn.StateMachineType.STANDARD,
  maxConcurrency: 10,
  itemReader: new sfn.S3JsonItemReader({
    bucket: sourceBucket,
    key: "manifest.json",
  }),
  resultPath: "$.files",
}

Running Amazon Bedrock batch inference

When all of the Amazon Textract jobs finish, the Distributed Map state creates an Amazon Bedrock batch inference input file, launches the Amazon Bedrock inference job, and waits for it to complete.

  1. A Lambda function collects text results from Amazon S3 and creates an Amazon Bedrock batch inference input file with custom prompts.
  2. The workflow starts the Amazon Bedrock batch inference job by calling createModelInvocationJob and sending the batch inference input file as input.
  3. The workflow pauses and stores the task token in DynamoDB.
  4. An EventBridge rule matches completed Amazon Bedrock batch inference events, and upon job completion and triggers a Lambda function. The Lambda function retrieves the task token and resumes the workflow, as shown in the following figure.

Lambda function retrieves the task token and resumes the workflow

A batch inference input is a single jsonl file with multiple entries such as the following example. The prompt in each inference request instructs the large language model (LLM) to analyze the paper and extract metadata. Read the full prompt template in the GitHub repository.

{
  "recordId": "c1b8a3b2086141f963",
  "modelInput": {
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "text": "Analyze the following research paper transcript and extract metadata about code and dataset availability. Extract the following metadata from this research paper transcript:\n\n1. **has_code**: Does the paper mention or link to source code? (true/false) ...... Return only valid JSON matching the schema above. Do not include any text outside of the JSON structure."
          }
        ]
      }
    ],
    "inferenceConfig": { "maxTokens": 4096 }
  }
}

Populating the Amazon Bedrock Knowledge Base

After the batch inference completes, the workflow does the following:

  1. Extracts inference results and creates metadata files based on the Amazon Bedrock inference results (example metadata shown in the following figure).
  2. Starts an Amazon Bedrock Knowledge Base ingestion job.
  3. Monitors the ingestion job status using Step Functions Wait and Choice states.
  4. Sends a completion notification through Amazon SNS.

Populating the Amazon Bedrock Knowledge Base

The following shows the example metadata format:

{
  "metadataAttributes": {
    "has_code": true,
    "has_dataset": false,
    "code_availability": "publicly_available",
    "dataset_availability": "not_available",
    "research_type": "methodology",
    "is_reproducible": true,
    "code_repository_url": "https://github.com/amazon-science/PIXELS"
  }
}

Testing the knowledge base

After the workflow completes successfully, you can test the knowledge base to verify that the documents and metadata have been properly ingested and are searchable. There are two practical methods for testing an Amazon Bedrock Knowledge Base:

  1. Using the Console
  2. Using the AWS SDK to run a query

Testing through the Console

The Console provides an intuitive interface for testing your knowledge base queries with metadata filters:

  1. Navigate to the Amazon Bedrock console.
  2. In the left navigation pane, choose Knowledge Bases under the Build section.
  3. Choose the knowledge base created by the AWS CDK deployment (the name will be output by the AWS CDK stack).
  4. Choose the Test button in the upper right corner.
  5. In the test interface, choose your preferred foundation model (FM) (such as Amazon Nova Pro).
  6. Expand the Configurations column, then navigate to the Filters section.
  7. Configure filters based on the extracted metadata, as shown in the following figure.

Configure filters based on the extracted metadata

Enter a natural language query related to your documents, for example: “Recent research on retrieval augmented generation?”

The console displays the generated response along with source attributions showing which documents were retrieved and used to formulate the answer, filtered by your specified metadata attributes, as shown in the following figure.

A chat example

Testing via API

For programmatic testing and integration into applications, use the AWS SDK with metadata filtering. The following is a Python example using boto3:

model_arn = "arn:aws:bedrock:us-east-1::foundation-model/amazon.nova-pro-v1:0"

# Query for papers with publicly available code
response = bedrock_agent_runtime.retrieve_and_generate(
    input={'text': "What recent research has been done on RAG?"},
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': knowledge_base_id,
            'modelArn': model_arn,
            'retrievalConfiguration': {
                'vectorSearchConfiguration': {
                    'numberOfResults': 5,
                    'filter': {"equals": {"key": "has_code", "value": True}},
                }
            },
        },
    },
)

# Display results
print(f"Response: {response['output']['text']}\n")
print("Source Documents:")

for citation in response.get('citations', []):
    for reference in citation.get('retrievedReferences', []):
        metadata = reference.get('metadata', {})
        print(f" Document: {reference['location']['s3Location']['uri']}\n")

The following is the test script output:

Response: Recent research on Retrieval-Augmented Generation (RAG) has focused on enhancing the system's ability to dynamically retrieve and utilize relevant information from a Vector Database (VDB) to improve decision-making and performance. Key innovations include:

1. **Dynamic Retrieval and Utilization**: The system is designed to query the VDB for contextually relevant past experiences, which significantly improves decision quality and accelerates performance by leveraging a growing repository of relevant experiences.

2. **Teacher-Student Instructional Tuning**: A novel mechanism where a Teacher agent refines a Student agent's core policy through direct interaction. The Teacher generates a modified SYSTEM prompt based on the Student's actions, creating a meta-learning loop that enhances the Student's reasoning policy over time.

Conclusion

This solution demonstrates how to combine multiple AWS AI and serverless services to build a scalable document processing pipeline. Organizations can use AWS Step Functions for orchestration, Amazon Textract for document processing, Amazon Bedrock batch inference for intelligent content analysis, and Amazon Bedrock Knowledge Bases for searchable storage. In turn, they can automate the extraction of insights from large document collections while optimizing costs.

Following this solution, you can build a solid foundation for production-scale document processing pipelines that maintain the flexibility to adapt to your specific requirements while making sure of reliability, scalability, and operational excellence. Follow this link to learn more about serverless architectures.

Accelerate workflow development with enhanced local testing in AWS Step Functions

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/accelerate-workflow-development-with-enhanced-local-testing-in-aws-step-functions/

Today, I’m excited to announce enhanced local testing capabilities for AWS Step Functions through the TestState API, our testing API.

These enhancements are available through the API, so you can build automated test suites that validate your workflow definitions locally on your development machines, test error handling patterns, data transformations, and mock service integrations using your preferred testing frameworks. This launch introduces an API-based approach for local unit testing, providing programmatic access to comprehensive testing capabilities without deploying to Amazon Web Services (AWS).

There are three key capabilities introduced in this enhanced TestState API:

  • Mocking support – Mock state outputs and errors without invoking downstream services, enabling true unit testing of state machine logic. TestState validates mocked responses against AWS API models with three validation modes: STRICT (this is the default and validates all required fields), PRESENT (validates field types and names), and NONE (no validation), providing high-fidelity testing.

  • Support for all state types – All state types, including advanced states such as Map states (inline and distributed), Parallel states, activity-based Task states, .sync service integration patterns, and .waitForTaskToken service integration patterns, can now be tested. This means you can use TestState API across your entire workflow definition and write unit tests to verify control flow logic, including state transitions, error handling, and data transformations.

  • Testing individual states – Test specific states within a full state machine definition using the new stateName parameter. You can provide the complete state machine definition one time and test each state individually by name. You can control execution context to test specific retry attempts, Map iteration positions, and error scenarios.

Getting started with enhanced TestState
Let me walk you through these new capabilities in enhanced TestState.

Scenario 1: Mock successful results

The first capability is mocking support, which you can use to test your workflow logic without invoking actual AWS services or even external HTTP requests. You can either mock service responses for fast unit testing or test with actual AWS services for integration testing. When using mocked responses, you don’t need AWS Identity and Access Management (IAM) permissions.

Here’s how to mock a successful AWS Lambda function response:

aws stepfunctions test-state --region us-east-1 \
--definition '{
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": {"FunctionName": "process-order"},
  "End": true
}' \
--mock '{"result":"{\"orderId\":\"12345\",\"status\":\"processed\"}"}' \
--inspection-level DEBUG

This command tests a Lambda invocation state without actually calling the function. TestState validates your mock response against the Lambda service API model so your test data matches what the real service would return.

The response shows the successful execution with detailed inspection data (when using DEBUG inspection level):

{
    "output": "{\"orderId\":\"12345\",\"status\":\"processed\"}",
    "inspectionData": {
        "input": "{}",
        "afterInputPath": "{}",
        "afterParameters": "{\"FunctionName\":\"process-order\"}",
        "result": "{\"orderId\":\"12345\",\"status\":\"processed\"}",
        "afterResultSelector": "{\"orderId\":\"12345\",\"status\":\"processed\"}",
        "afterResultPath": "{\"orderId\":\"12345\",\"status\":\"processed\"}"
    },
    "status": "SUCCEEDED"
}

When you specify a mock response, TestState validates it against the AWS service’s API model so your mocked data conforms to the expected schema, maintaining high-fidelity testing without requiring actual AWS service calls.

Scenario 2: Mock error conditions
You can also mock error conditions to test your error handling logic:

aws stepfunctions test-state --region us-east-1 \
--definition '{
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": {"FunctionName": "process-order"},
  "End": true
}' \
--mock '{"errorOutput":{"error":"Lambda.ServiceException","cause":"Function failed"}}' \
--inspection-level DEBUG

This simulates a Lambda service exception so you can verify how your state machine handles failures without triggering actual errors in your AWS environment.

The response shows the failed execution with error details:

{
    "error": "Lambda.ServiceException",
    "cause": "Function failed",
    "inspectionData": {
        "input": "{}",
        "afterInputPath": "{}",
        "afterParameters": "{\"FunctionName\":\"process-order\"}"
    },
    "status": "FAILED"
}

Scenario 3: Test Map states
The second capability adds support for previously unsupported state types. Here’s how to test a Distributed Map state:

aws stepfunctions test-state --region us-east-1 \
--definition '{
  "Type": "Map",
  "ItemProcessor": {
    "ProcessorConfig": {"Mode": "DISTRIBUTED", "ExecutionType": "STANDARD"},
    "StartAt": "ProcessItem",
    "States": {
      "ProcessItem": {
        "Type": "Task", 
        "Resource": "arn:aws:states:::lambda:invoke",
        "Parameters": {"FunctionName": "process-item"},
        "End": true
      }
    }
  },
  "End": true
}' \
--input '[{"itemId":1},{"itemId":2}]' \
--mock '{"result":"[{\"itemId\":1,\"status\":\"processed\"},{\"itemId\":2,\"status\":\"processed\"}]"}' \
--inspection-level DEBUG

The mock result represents the complete output from processing multiple items. In this case, the mocked array must match the expected Map state output format.

The response shows successful processing of the array input:

{
    "output": "[{\"itemId\":1,\"status\":\"processed\"},{\"itemId\":2,\"status\":\"processed\"}]",
    "inspectionData": {
        "input": "[{\"itemId\":1},{\"itemId\":2}]",
        "afterInputPath": "[{\"itemId\":1},{\"itemId\":2}]",
        "afterResultSelector": "[{\"itemId\":1,\"status\":\"processed\"},{\"itemId\":2,\"status\":\"processed\"}]",
        "afterResultPath": "[{\"itemId\":1,\"status\":\"processed\"},{\"itemId\":2,\"status\":\"processed\"}]"
    },
    "status": "SUCCEEDED"
}

Scenario 4: Test Parallel states
Similarly, you can test Parallel states that execute multiple branches concurrently:

aws stepfunctions test-state --region us-east-1 \
--definition '{
  "Type": "Parallel",
  "Branches": [
    {"StartAt": "Branch1", "States": {"Branch1": {"Type": "Pass", "End": true}}},
    {"StartAt": "Branch2", "States": {"Branch2": {"Type": "Pass", "End": true}}}
  ],
  "End": true
}' \
--mock '{"result":"[{\"branch1\":\"data1\"},{\"branch2\":\"data2\"}]"}' \
--inspection-level DEBUG

The mock result must be an array with one element per branch. By using TestState, your mock data structure matches what a real Parallel state execution would produce.

The response shows the parallel execution results:

{
    "output": "[{\"branch1\":\"data1\"},{\"branch2\":\"data2\"}]",
    "inspectionData": {
        "input": "{}",
        "afterResultSelector": "[{\"branch1\":\"data1\"},{\"branch2\":\"data2\"}]",
        "afterResultPath": "[{\"branch1\":\"data1\"},{\"branch2\":\"data2\"}]"
    },
    "status": "SUCCEEDED"
}

Scenario 5: Test individual states within complete workflows
You can test specific states within a full state machine definition using the stateName parameter. Here’s an example testing a single state, though you would typically provide your complete workflow definition and specify which state to test:

aws stepfunctions test-state --region us-east-1 \
--definition '{
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": {"FunctionName": "validate-order"},
  "End": true
}' \
--input '{"orderId":"12345","amount":99.99}' \
--mock '{"result":"{\"orderId\":\"12345\",\"validated\":true}"}' \
--inspection-level DEBUG

This tests a Lambda invocation state with specific input data, showing how TestState processes the input and transforms it through the state execution.

The response shows detailed input processing and validation:

{
    "output": "{\"orderId\":\"12345\",\"validated\":true}",
    "inspectionData": {
        "input": "{\"orderId\":\"12345\",\"amount\":99.99}",
        "afterInputPath": "{\"orderId\":\"12345\",\"amount\":99.99}",
        "afterParameters": "{\"FunctionName\":\"validate-order\"}",
        "result": "{\"orderId\":\"12345\",\"validated\":true}",
        "afterResultSelector": "{\"orderId\":\"12345\",\"validated\":true}",
        "afterResultPath": "{\"orderId\":\"12345\",\"validated\":true}"
    },
    "status": "SUCCEEDED"
}

These enhancements bring the familiar local development experience to Step Functions workflows, helping me to get instant feedback on changes before deploying to my AWS account. I can write automated test suites to validate all Step Functions features with the same reliability as cloud execution, providing confidence that my workflows will work as expected when deployed.

Things to know
Here are key points to note:

  • Availability – Enhanced TestState capabilities are available in all AWS Regions where Step Functions is supported.
  • Pricing – TestState API calls are included with AWS Step Functions at no additional charge.
  • Framework compatibility – TestState works with any testing framework that can make HTTP requests, including Jest, pytest, JUnit, and others. You can write test suites that validate your workflows automatically in your continuous integration and continuous delivery (CI/CD) pipeline before deployment.
  • Feature support – Enhanced TestState supports all Step Functions features including Distributed Map, Parallel states, error handling, and JSONata expressions.
  • Documentation – For detailed options for different configurations, refer to the TestState documentation and API reference for the updated request and response model.

Get started today with enhanced local testing by integrating TestState into your development workflow.

Happy building!
Donnie

Handle unpredictable processing times with operational consistency when integrating asynchronous AWS services with an AWS Step Functions state machine

Post Syndicated from Philip Whiteside original https://aws.amazon.com/blogs/compute/handle-unpredictable-processing-times-with-operational-consistency-when-integrating-asynchronous-aws-services-with-an-aws-step-functions-state-machine/

Integrating asynchronous AWS services with an AWS Step Functions state machine, presents a challenge when building serverless applications on Amazon Web Services (AWS). Services such as Amazon Translate, Amazon Macie, and Amazon Bedrock Data Automation (BDA) excel at handling long-running operations that can take more than 10 minutes to complete because of their asynchronous nature. Asynchronous services return an immediate 200 OK response, indicating that the request has succeeded, upon job submission (see the API response syntax of StartTextTranslationJob in Amazon Translate, CreateClassificationJob in Macie, and InvokeDataAutomationAsync in BDA), rather than waiting for the actual task completion and results.

In this post, we explore using AWS Step Function state machine with asynchronous AWS services, look at some scenarios where the processing time can be unpredictable, explain when traditional solutions such as polling (periodically check) fall short, and demonstrate how to implement a generalized callback pattern to handle asynchronous operations into a more manageable synchronous flow. We cover the related architecture, technical implementation, and best practices, and we provide a real-world examples that uses the AWS Cloud Development Kit (AWS CDK). Services used in this generalized callback pattern include Amazon DynamoDB, Amazon EventBridge and AWS Step Functions.

Understanding the issue this solution addresses

Asynchronous operations are designed to handle long-running operations without blocking resources, a design followed by many AWS services. However, these services create challenges in Step Functions workflows by returning immediate 200 OK responses rather than confirming task completion. This breaks the Step Functions execution model, which expects each step to be complete before advancing. Developers often attempt to address this issue through polling loops to repeatedly check the status of operations, an approach that works for containerized applications and Amazon Elastic Compute Cloud (Amazon EC2). For these services, compute resources are already provisioned, but compute resources become problematic in serverless architectures when AWS Lambda functions have a 15-minute execution limit, making them unsuitable for long-running polls.

Step Functions supports Run a Job (.sync) to call a service and have Step Functions wait for a job to complete, but this works only for selected optimized integrations. However, this functionality is limited to specific AWS services such as AWS Glue. Amazon Translate, Macie, and other services are not optimized integrations. If your operation is not listed as working with .sync, it can benefit from the generalized callback pattern covered in this post.

For these non-optimized integrations, an option is to use polling (periodically check). However, polling can lead to additional latency in response because polling times are unlikely to align with job completion. This is shown in the following figure.

Timeline diagram showing alternating 'Job' blocks and 'Delay' blocks, with 'Poll' markers indicated at regular intervals along the time axis. The diagram illustrates a sequential process of job execution and delay periods.

Figure 1: A job processing and delay timeline diagram

The Step Functions generalized callback pattern can solve this latency issue by pausing execution for up to one year while waiting for task completion (this does not incur additional cost). When such an asynchronous operation finishes, a callback mechanism resumes the workflow where it left off. This generalized callback pattern transforms asynchronous operations into synchronous ones, and it maintains cost efficiency and operational agility.

Scenarios

To help us see where this generalized callback pattern could be applied, let’s look at a few scenarios. Each of these scenarios makes use of AWS Step Functions state machines to run the applications’ workflows.

Scenario 1: Document translation with personally identifiable information compliance

Organizations must manage personally identifiable information (PII) when translating documents because PII can be duplicated across language outputs. For example, when translating a document containing “Jane Doe,” that name appears in both the original and translated versions, creating multiple instances of sensitive data that need compliance measures. Amazon Translate batch translation has a default concurrency of 10, meaning that translations could take more than 10 minutes or be queued for longer periods. Additionally, the Amazon Translate batch translation operation is asynchronous, holding the translation request in a queue until completed. The generalized callback pattern in this post makes sure that Step Functions state machine workflows resume appropriately to apply consistent PII handling across all outputs. In this scenario the design makes use of tagging Amazon Simple Storage Service (Amazon S3) files as containing PII or not, which in turn associates S3 lifecycle policies for specific retention periods to those S3 objects.

Workflow diagram showing five connected steps: 1) Start, 2) StartTextTranslationJob, 3) Wait for Translate result, 4) Tag all files, 5) ending with End state.

Figure 2: A text translation workflow diagram

Scenario 2: Using concurrent execution to pause the state machine until processes have completed

Continuing from scenario 1, Macie and Amazon Translate can run in parallel (each approximately 10 minutes) rather than sequentially (approximately 20 minutes) for a better user experience. Similarly to Amazon Translate batch translation operations being asynchronous, the Macie create classification operation is also asynchronous. Step Functions state machines enable concurrent execution of both service requests. The generalized callback pattern enables the state machine to pause each parallel workflow and resume only when the asynchronous services have completed their jobs. Without this pattern, both services would immediately return 200 OK responses, causing the workflow to continue prematurely before translations or classification results are available. If the classification results are not available later in the workflow, then the appropriate PII tags will not be applied and therefore the appropriate lifecycle retention policy will also not be applied, resulting in not adhering to PII handling practices.

Figure 3: A parallel classification and translation workflow diagram

Scenario 3: Intelligent document processing

Organizations that use Bedrock Data Automation for intelligent document processing must take into consideration Regional concurrency limits. BDA has Regional concurrency limits “Max number of concurrent jobs” of 25 jobs in the us-east-1 and us-west-2 Regions. Also, BDA has a concurrency limit of only five jobs in other supported Regions, so large document batches could be queued for extended periods resulting in long processing wait times for the user. This service functionality is handled asynchronously as the duration of the request could be many minutes. The generalized callback pattern makes sure that workflows resume appropriately as soon as a job finishes rather than waiting an arbitrary time to check if the job has been completed. For example, the generalized callback pattern for BDA can be used to enhance the solution outlined in the blog post, Scalable intelligent document processing using Amazon Bedrock Data Automation.

Figure 4: A data automation workflow diagram

Solution architecture

The following architecture diagram shows the generalized callback pattern (the blue section on the right side) integrated with your existing application (the grey section on the left side).

Figure 5: The Step Functions generalized callback architecture

Key components of this post’s solution architecture

This generalized callback pattern architecture consists of four essential components working together. Each component plays a specific role while maintaining cost efficiency and operational reliability. The following components form the foundation of this pattern:

  • Step Functions task: Implements the “Wait for Callback” task state generating unique task tokens for workflow resumption.
  • EventBridge rule: Monitors asynchronous service completion events and is customizable for different service patterns. AWS services make use of an event bus to route service event notifications to other services, such as job completions.
  • DynamoDB: Provides persistent storage correlating job IDs with task tokens for quick lookup.
  • Step Functions state machine: Manages the resume process and makes sure of proper cleanup of stored tokens.

Solution process

This generalized callback pattern operates through a coordinated sequence of four key steps. Each step builds upon the previous one. The following process demonstrates how the pattern manages workflow execution. The diagram above shows more detailed steps following these key steps.

  1. Start the asynchronous operation for which you want to wait for completion. The asynchronous service responds with success (200 OK) and the state machine continues. Initiating an Amazon Translate batch translation operation is one example of such an asynchronous operation.
  2. Trigger the generalized callback pattern with the “Wait for Callback” capability. Pair the task token with the jobId in DynamoDB using the unique jobId as the primary key. Example:
    {
        id    = translationJobId,
        token = stepFunctionTaskToken
    }
  3. Monitor for completion: When the asynchronous service completes the requested job, such as translation of documents, an event is created in EventBridge that contains the jobId and status. Example:
    {
        jobId  = translationJobId,
        status = complete
    }
    
  4. Resume workflow: The EventBridge rule triggers the workflow to resume, which looks up the task token using the jobId, resumes the paused Step Functions execution, and cleans up the database entry.

Not every service creates events for every action, so validate that your service operation generates the expected events. For example, Macie does not create events when no findings are discovered. In these cases, implement more event generation mechanisms through Amazon CloudWatch Logs subscriptions that trigger Lambda functions to create custom events.

Technical implementation of the solution

For rapid deployment of this post’s solution, AWS CDK users can use this sample CDK pattern with all key components. Alternatively, you can implement the individual components yourself by using the following steps, with each component customizable to your requirements.

Some of the JSON-based snippets below are Amazon States Language (ASL) snippets, which is the language that defines an AWS Step Functions state machine. State machines can be built in the AWS Console using the drag and drop visual builder, or with ASL. The visual builder generates this ASL and you can toggle to view/edit the workflow code (ASL).

Use a Step Functions task that supports “WaitForCallback” to store task token in DynamoDB

Use a Step Functions task that supports ”WaitForCallback” to store the task token in DynamoDB alongside the job ID from the asynchronous service.

AWS services generate a unique ID for that service which refers to that job/request/action. DynamoDB holds the mappings between job IDs and task tokens, supporting multiple state machines paused in parallel with concurrent execution. To prevent clashes when different asynchronous services generate overlapping IDs (for example, if Service A and Service B both generate ID “12345”), use separate DynamoDB tables for each service to maintain ID uniqueness. The sample AWS CDK pattern demonstrates this approach by providing dedicated DynamoDB tables and Step Functions state machines for each service integration. This ID-token structure allows for quick lookups for workflow resumption and cleanup.

The following ASL accomplishes this by using a DynamoDB PutItem task:

"DynamoDB PutItem": {
    "Type": "Task",
    "Resource": "arn:aws:states:::dynamodb:putItem",
    "Parameters": {
        "TableName": "resumeTokenSessionTable",
        "Item": {
            "id":    { "S.$": "$.JobId" },
            "token": { "S.$": "$$.Task.Token" },
            "ttl":   { "S.$": "$.ttl" }
        },
        "ConditionExpression": "attribute_not_exists(id)"
    },
    "Next": "XXXX"
}

In this example, the Item object stores three values: the job ID ($.JobId), the task token ($$.Task.Token), and a TTL value ($.ttl). The ttl field configures Time to Live for automatic cleanup based on your service’s expected completion time. Since this stores only three small string values, data usage per entry is minimal. The primary consideration is the number of concurrent operations, as each active asynchronous job requires one DynamoDB entry until completion or TTL expiration.

The DynamoDB table uses “id” as the primary key and includes a “token” attribute. These fields are essential for the “WaitForCallback” pattern: the “id” (job ID) allows your asynchronous service to look up the correct entry, while the “token” (Step Functions task token) is what your service sends back to Step Functions to resume the paused workflow. The following JSON shows an example of these values:

{
    "id":    { "S": "xxxxxxxx-yyyy-zzzz-aaaa-bbbbbbbbbbbb" },
    "token": { "S": "11111111-2222-3333-4444-555555555555" },
    "ttl":   { "S": "1480550400" }
}

When your asynchronous service completes its work, it retrieves the task token using the job ID, then calls Step Functions with that token to resume execution from where it paused.

The task token acts as a unique identifier for resuming execution at the exact pause point. To prevent overriding an existing record when a duplicate id is used, you can specify a “ConditionExpression”. This ASL shows just the ConditionExpression.

“ConditionExpression”: “attribute_not_exists(id)”

Create an EventBridge rule to monitor event patterns from your asynchronous service

EventBridge integration forms the heart of the event-driven resumption mechanism. You can create EventBridge rules to monitor specific event patterns from asynchronous AWS services. Most AWS services automatically publish completion events to default EventBridge at no cost, and you can use the EventBridge rule wizard to identify correct event patterns. For services that do not publish events—such as Macie that creates no events when no findings are discovered—implement shims by using Amazon CloudWatch Logs to trigger Lambda functions that generate custom events. This JSON shows the EventBridge Rule pattern definition.

"EventPattern": {
    "source": [
        "aws.translate"
    ],
    "detail-type": [
        "Translate TextTranslationJob State Change"
    ],
    "detail": {
        "jobStatus": [
            "COMPLETED"
        ],
    }
}

Resume the workflow

At this point, you know the operation has completed, so you can safely resume the workflow. Using the job ID, call the DynamoDB GetItem operation to receive the task token. This ASL shows the task definition to get the task token for a given job ID retrieved from the event notification.

"getResumeToken": {
    "Next": "sendTaskSuccess",
    "Type": "Task",
    "ResultPath": "$.getResumeToken",
    "Resource": "arn:aws:states:::dynamodb:getItem",
    "Parameters": {
        "Key": {
            "id": { "S.$": "$.id" }
        },
        "TableName": "resumeTokenSessionTable"
    }
}

Use the task token to resume the workflow and then delete the DynamoDB entry for cleanup. This ASL shows the task definition to use the task token to resume the state machine at the point where it was paused at.

"sendTaskSuccess": {
    "Next": "deleteResumeToken",
    "Type": "Task",
    "ResultPath": "$.sendTaskSuccess",
    "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess",
    "Parameters": {
        "TaskToken.$": "$.getResumeToken.Item.token.S",
        "Output": {
            "status": "resume"
        }
    }
}

This ASL shows the task definition to clean up the DynamoDB to remove the used task token.

"deleteResumeToken": {
    "End": true,
    "Type": "Task",
    "Resource": "arn:aws:states:::dynamodb:deleteItem",
    "Parameters": {
        "Key": {
            "id": { "S.$": "$.id" }
        },
        "TableName": "resumeTokenSessionTable"
    }
}

This completes the technical implementation of our solution. With all components in place—the WaitForCallback task, EventBridge rules, workflow resumption logic, and DynamoDB storage—you now have a fully functional generalized callback pattern implementation that eliminates polling and efficiently manages asynchronous operations.

Now that we’ve established how to implement the generalized callback pattern technically, let’s explore the best practices and important considerations that will help you optimize and secure your implementation.

Best practices and considerations

When implementing the generalized callback pattern in AWS Step Functions, it’s essential to understand and apply best practices that optimize costs, enhance security, and ensure efficient operation. This section outlines key considerations and recommendations for implementing the pattern effectively, focusing on cost optimization strategies and security measures that help maintain a robust and secure serverless workflow. By following these guidelines, you can maximise the benefits of the generalized callback pattern while minimising potential risks and unnecessary expenses.

Optimize costs by using this post’s generalized callback pattern

Managing costs for long-running asynchronous operations can present challenges. Traditional polling accumulates unnecessary expenses through repeated state transitions and execution time, but this post’s generalized callback pattern is an event-driven approach that significantly reduces operational costs.

Eliminate polling costs and minimize execution time

The generalized callback pattern reduces costs by eliminating polling transitions and pausing execution during wait periods. For standard workflows billed at $0.000025 per state transition, using just two transitions instead of continuous polling achieves approximately an 87% cost reduction. A 15-minute translation job polling every minute would need 15 transitions as opposed to two with the generalized callback pattern. For express workflows billed at $0.000001 per request and $0.00001667 per GB-second, the pattern delivers significant savings through reduced request count and minimal execution time. Traditional polling keeps workflows active during the entire operation, accumulating execution time charges. By contrast, the generalized callback pattern eliminates execution time charges during the wait period. In the translation job example mentioned previously in this paragraph, this could reduce the execution time from more than 15 minutes to just the seconds needed to start jobs and complete processes.

Increase resource efficiency

The callback pattern increases resource efficiency by removing constant polling, resulting in substantial reduction in CloudWatch logging and associated monitoring costs. This creates a more cost-effective solution with a reduced AWS resource footprint.

Further cost-optimize the callback pattern

Enhance cost efficiency through DynamoDB optimizations. Choose on-demand mode for unpredictable workloads or provisioned mode with auto scaling for consistent patterns, configure auto scaling settings based on usage, and implement TTL to automatically remove expired items without consuming write capacity.

Security considerations for the callback pattern

The callback pattern involves storing task tokens, processing events, and managing workflow resumption across multiple AWS services. Implementing proper access controls is essential to protect the integrity of your workflows and prevent unauthorized access or manipulation of the pattern’s components.

This section outlines the security considerations for the callback pattern, focusing on access controls for data storage and event processing.

Data storage security

Enable DynamoDB encryption at rest by using AWS owned or user managed AWS Key Management Service (AWS KMS) keys. Implement identity-based policies by defining the Step Functions AWS Identity and Access Management (IAM) role actions (such as PutItem, GetItem, and DeleteItem) and resource-based policies that specify which IAM principals can access the table. Together, these help ensure that only authorized state machines access token storage and operations are limited to minimum permissions. Also, configure TTL to automatically remove expired tokens so that these tokens do not accidentally get reused, which can result in errors with resuming the relevant AWS Step Function workflows.

Event processing security

Scope EventBridge rules precisely to match only specific necessary events. For Amazon Translate job completion, rules should explicitly match only translation job completion events, thus preventing unauthorized triggers. IAM roles should follow least-privilege principles so that only specific actions can cause workflows to resume.

Conclusion

The callback pattern presented in this post provides a solution for managing long-running asynchronous operations in serverless architectures. You can use the Step Functions “Wait for Callback” task state with EventBridge and DynamoDB to transform asynchronous services into synchronous workflows without the overhead of polling. This pattern reduces costs, improves efficiency through event-driven architecture, and maintains security through proper access controls. You can use the provided CDK implementation to implement this pattern and adapt it to your specific needs while following recommended security and cost optimization practices. 


About the authors

Maria John is a Senior Solutions Architect at Amazon Web Services, helping customers build solutions on AWS.

Philip Whiteside is a Senior Solutions Architect at Amazon Web Services. Philip is passionate about overcoming barriers by utilizing technology.

Orchestrating big data processing with AWS Step Functions Distributed Map

Post Syndicated from Biswanath Mukherjee original https://aws.amazon.com/blogs/compute/orchestrating-big-data-processing-with-aws-step-functions-distributed-map/

Developers seek to process and enrich semi-structured big data datasets with durably orchestrated network-based workflows. For example, during quarterly earnings season, finance organizations run thousands of market simulations simultaneously to provide timely insights for scenario planning or risk management—these workloads require coordination between raw datasets and on-premise servers to provide the latest market information.

AWS Step Functions is a visual workflow service capable of orchestrating over 14,000 API actions from over 220 AWS services to build distributed applications. Now, Step Functions Distributed Map streamlines big data dataset transformation by processing Amazon Athena data manifest and Parquet files directly. Using its Distributed Map feature, you can process large scale datasets by running concurrent iterations across data entries in parallel. In Distributed mode, the Map state processes the items in the dataset in iterations called child workflow executions. You can specify the number of child workflow executions that can run in parallel. Each child workflow execution has its own, separate execution history from that of the parent workflow. By default, Step Functions runs 10,000 parallel child workflow executions in parallel.

Distributed Map can process AWS Athena data manifest and Parquet files directly, eliminating the need for custom pre-processing. You also now have visibility into your Distributed Map usage with new Amazon CloudWatch metrics: Approximate Open Map Runs Count, Open Map Run Limit, and Approximate Map Runs Backlog Size.

In this post, you’ll learn how to use AWS Step Functions Distributed Map to process Athena data manifest and Parquet files through a step-by-step demonstration.

This post is part of a series of post about AWS Step Functions Distributed Map:

Use case: IoT sensor data processing

You’ll build a sample application that demonstrates processing IoT sensor data in Parquet format using Step Functions Distributed Map. These Parquet data files and a manifest file containing the list of the data files are exported from Athena. The data temperature, humidity, and lbattery level from different devices. The following table shows sample of sensor data:

Example IoT sensor data

Example IoT sensor data

Your objective is to use the Athena data manifest file, get the list of Parquet files, and iterate over the data in the files to detect anomalies and also stream the processed data through Amazon Kinesis Data Firehose to an Amazon S3 bucket for further analytics using Athena queries. Following is the criteria to detect anomaly:

  • Low battery conditions: less than 20%
  • Humidity anomalies: more than 95% or less than 5%
  • Temperature spikes: more than 35°C or less than -10°C

The following diagram represents the AWS Step Functions state machine:

Parquet files processing workflow

Parquet files processing workflow

  1. The Distributed Map runs an Athena query which generates Parquet data files and an Athena manifest file (csv). The manifest file contains the list of Parquet data files.
  2. Distributed Map processes these Parquet data files in parallel using child workflow executions. You can control the number of child workflow executions that can run in parallel using MaxConcurrency parameter. See Step Functions service quotas to learn more about concurrency limits.
  3. Each child workflow execution invokes an AWS Lambda function to process the respective Parquet file. The Lambda function processes individual sensor readings and detects anomalies according to the preceeding logic and returns a processed sensor data summary response.
  4. The child workflow sends the summary response record to Amazon Kinesis firehose stream which stores the results in a specified Amazon S3 results bucket.

The following Athena Start QueryExecution state runs an UNLOAD query to generate data files in Parquet format and a manifest file in CSV. The output will be stored in the S3 bucket specified in the UNLOAD query and the manifest file will be stored in the S3 bucket configured for the Athena workgroup.

{
  "QueryLanguage": "JSONata",
  "States": {
	   "Athena StartQueryExecution": {
	    "Type": "Task",
	        "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
	        "Arguments": {
		"QueryString": "UNLOAD (WRITE_YOUR_SELECT_QUERY_HERE) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')",
		"WorkGroup": "primary"
	},
	"Output": {
	"ManifestObjectKey": "{% $join([$states.result.QueryExecution.ResultConfiguration.OutputLocation, '-manifest.csv']) %}"
},
“Next”: “Next State”
…
}

The following ItemReader is configured to use a manifest type of “ATHENA_DATA” with “PARQUET” data input.

{
  "QueryLanguage": "JSONata",
  "States": {
    ...
    "Map": {
        ...
        "ItemReader": {
        	"Resource": "arn:aws:states:::s3:getObject",
   	"ReaderConfig": {
      		"ManifestType": "ATHENA_DATA",
      		"InputType": "PARQUET"
   	},
   	"Arguments": {
      		"Bucket":"Bucket": "{% $split($substringAfter($states.input.ManifestObjectKey, 's3://'), '/')[0] %}",,
      		"Key": "{% $substringAfter($substringAfter($states.input.ManifestObjectKey, 's3://'), '/') %}"
   	}
	    },
        ...
    }
}

Additional supported InputType options are CSV and JSONL. All objects referenced in a single manifest file must have the same InputType format. You specify the Amazon S3 bucket location of Athena manifest CSV file under Arguments.

The context object contains information in a JSON structure about your state machine and execution. Your workflows can reference the context object in a JSONata expression with $states.context.

Within a Map state, the Context object includes the following data:

"Map": {
   "Item": {
      "Index" : Number,
      "Key"   : "String", // Only valid for JSON objects
      "Value" : "String",
      "Source": "String"
   }
}

For each Map state iteration, Index contains the index number for the array item that is being currently processed, Key is available only when iterating over JSON objects, Value contains the array item being processed, and Source contains one of the following:

  • For state input, the value will be : STATE_DATA
  • For Amazon S3 LIST_OBJECTS_V2 with Transformation=NONE, the value will show the S3 URI for the bucket. For example: S3://amzn-s3-demo-bucket.
  • For all the other input types, the value will be the Amazon S3 URI. For example: S3://amzn-s3-demo-bucket/object-key.

Using this newly introduced Source field in the context object, you can connect the child executions with the source object.

Prerequisites

Set up the state machine and sample data

Run the following steps to deploy the Step Functions state machine.

  1. Clone the GitHub repository in a new folder and navigate to the project root folder.
    git clone https://github.com/aws-samples/sample-stepfunctions-athena-manifest-parquet-file-processor.git
    cd sample-stepfunctions-athena-manifest-parquet-file-processor

  2. Run the following command to install required Python dependencies for the Lambda function.
    python3 -m venv .venv
    source .venv/bin/activate
    python3 -m pip install -r requirements.txt

  3. Build the application.
    sam build

  4. Deploy the application
    sam deploy --guided

  5. Enter the following details:
    • Stack name: The CloudFormation stack name (for example, sfn-parquet-file-processor)
    • AWS Region: A supported AWS Region (for example, us-east-1)
    • Keep rest of the components to default values.

    Note the outputs from the AWS SAM deploy. You will use them in the subsequent steps.

  6. Run the following command to generate sample data in csv format and upload it to an S3 bucket. Replace <IoTDataBucketName> with the value from sam deploy ouptut.
    python3 scripts/generate_sample_data.py <IoTDataBucketName>

Create the Athena database and tables

Before you can run queries, you must set up an Athena database and table for your data.

  1. From Amazon Athena console, navigate to workgoups, select the workgroup named “primary”. Select Edit from Actions. In the query result configuration section, select the options as follows:
    1. Management of query results – select customer managed
    2. Location of query results – enter s3://<IoTDataBucketName>. Replace <IoTDataBucketName> with the value from sam deploy output.
    3. Choose Save to save the changes to the workgroup
  2. Select Query editor tab and run the following commands to create database and tables
    CREATE DATABASE `iotsensordata`;

  3. Create an Athena table in database iotsensordata that references the S3 bucket containing the raw sensor data. In this case it will be <IoTDataBucketName>. Replace <IoTDataBucketName> with the value from sam deploy output.
    CREATE EXTERNAL TABLE IF NOT EXISTS `iotsensordata`.`iotsensordata` 
    (`deviceid` string, 
    `timestamp` string,
    `temperature` double,
    `humidity` double,
    `batterylevel` double,
    `latitude` double,
    `longitude` double
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES ('field.delim' = ',')
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<IoTDataBucketName>/daily-data/'
    TBLPROPERTIES (
     'classification' = 'csv',
     'skip.header.line.count' = '1'
    );

  4. Create an Athena table in database iotsensordata that references the S3 bucket having the analytics results streamed from Kinesis Data Firehose. Replace <IoTAnalyticsResultsBucket> with value from sam deploy output. And replace <year> with the current year (e.g 2025).
    CREATE EXTERNAL TABLE IF NOT EXISTS iotsensordata.iotsensordataanalytics (deviceid string, analysisDate string, readingTimestamp string, readingsCount int, metrics struct< temperature: double, humidity: double, batterylevel: double, latitude: double, longitude: double >, anomalies array <string>, anomalyCount int, healthStatus string, timestamp string )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES ( 'ignore.malformed.json' = 'FALSE', 'dots.in.keys' = 'FALSE', 'case.insensitive' = 'TRUE'
    )
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<IoTAnalyticsResultsBucket>/<year>/'
    TBLPROPERTIES ('classification' = 'json', 'typeOfData'='file');

Start your state machine

Now that you have data ready and Athena set up for queries, start your state machine to retrieve and process the data.

  1. Run the following command to start execution of the Step Functions. Replace the <StateMachineArn> and <IoTDataBucketName> with the value from sam deploy output..
    aws stepfunctions start-execution \
      --state-machine-arn <StateMachineArn> \
      --input '{ "IoTDataBucketName": "<IoTDataBucketName>"}'

    The Step Functions state machine has the Athena StartQueryExecution state which has an UNLOAD query that generates the sensor data files in a parquet format and a manifest file in CSV format. The manifest will have 5 rows referencing the 5 parquet files. The state machine will process these 5 parquet files in one map run.

  2. Run the following command to get the details of the execution. Replace the executionArn from the previous command.
    aws stepfunctions describe-execution --execution-arn <executionArn>

  3. After you see the status SUCCEEDED, run the following command from Athena query editor to check the processed output from Kinesis Data Firehose that was streamed to S3 bucket referenced by the Athena table created in step 4 of the preceding section.
    SELECT * FROM iotsensordata.iotsensordataanalytics WHERE anomalycount = 1;

If any of the sensor data exceeds the thresholds, the healthstatus attribute will be set to “anomalies_detected”. The workflow produced a summary table of metadata which you can now query for reporting.

Output from Athena Query Editor

Review workflow performance

Using the following observability metrics, you can review key performance behavior of your data processing workflow.
The AWS/States namespace includes the following new metrics for all Step Functions Map Runs.

  • OpenMapRunLimit: This is the maximum number of open Map Runs allowed in the AWS account. The default value is 1,000 runs and is a hard limit. For more information, see Quotas related to accounts.
  • ApproximateOpenMapRunCount: This metric tracks the approximate number of Map Runs currently in progress within an account. Configuring an alarm on this metric using the Maximum statistic with a threshold of 900 or higher can help you take proactive action before reaching the OpenMapRunLimit of 1,000. This metric enables operational teams to implement preventive measures, such as staggering new executions or optimizing workflow concurrency, to maintain system stability and prevent backlog accumulation.
  • ApproximateMapRunBacklogSize: This metric shows up when the ApproximateOpenMapRunCount has reached 1,000 and there are backlogged Map Runs waiting to be executed. Backlogged Map Runs wait at the MapRunStarted event until the total number of open Map Runs is less than the quota.

The following graph shows an example of these new metrics. Use the maximum statistic to visualize these metrics. ApproximateMapRunBacklogSize metrics appear after accounts start getting throttled on the OpenMapRunLimit limit. The OpenMapRun (orange line) is the account hard limit of 1,000 shown as a static line. The ApproximateOpenMapRunCount (violet line) is the current number of active OpenMap runs. The ApproximateMapRunBacklogSize (green line) indicates the map runs waiting in backlog to be processed. When the ApproximateOpenMapRunCount is lower than 1000 (OpenMapRun limit) there are no map runs in backlog. However, when the count reaches the OpenMapRun limit, the backlog of map runs starts to build up. After the active runs complete, the backlog will start to drain out and new runs will begin execution.

Graphed metrics from Amazon CloudWatch

Graphed metrics from Amazon CloudWatch

Clean up

To avoid costs, remove all resources created for this post once you’re done. From the Athena query editor, run the following commands:

DROP TABLE `iotsensordata`.`iotsensordata`;
DROP TABLE `iotsensordata`.`iotsensordataanalytics`;
DROP DATABASE `iotsensordata`;

Run the following commands from the AWS CLI after replacing the <placeholder> variable to delete the resources you deployed for this post’s solution:

aws s3 rm s3://<IoTDataBucketName> --recursive
aws s3 rm s3://<IoTAnalyticsResultsBucketName> --recursive
sam delete

Conclusion

With this update, Distributed Map now supports additional data inputs, so you can orchestrate large-scale analytics and ETL workflows. You can now process Amazon Athena data manifest and Parquet files directly, eliminating the need for custom pre-processing. You also now have visibility into your Distributed Map usage with the following metrics: Approximate Open Map Runs Count, Open Map Run Limit, and Approximate Map Runs Backlog Size.

New input sources for Distributed Map are available in all commercial AWS Regions where AWS Step Functions is available. For a complete list of AWS Regions where Step Functions is available, see the AWS Region Table. The improved observability of your Distributed Map usage with new metrics is available in all AWS Regions. To get started, you can use the Distributed Map mode today in the AWS Step Functions console. To learn more, visit the Step Functions developer guide.

For more serverless learning resources, visit Serverless Land.

Optimizing nested JSON array processing using AWS Step Functions Distributed Map

Post Syndicated from Biswanath Mukherjee original https://aws.amazon.com/blogs/compute/optimizing-nested-json-array-processing-using-aws-step-functions-distributed-map/

When you’re working with large datasets, you’ve likely encountered the challenge of processing complex JSON structures in your automated workflows. You need to preprocess arrays within nested JSON objects before you can run parallel processing on them. Extracting data used to require custom code and extra processing steps, delaying you from building your core application logic.

With AWS Step Functions Distributed Map, you can process large datasets with concurrent iterations of workflow steps across data entries. Using the enhanced ItemsPointer feature of Distributed Maps, you can extract array data directly from JSON objects stored in Amazon S3. Alternatively, for JSON object as state input, you can use Items (JSONata) or ItemsPath (JSONPath). With this enhancement you can point directly to arrays nested within JSON structures, eliminating the need for custom preprocessing of your data. With ItemsPointer, Items, and ItemsPath you can select the nested array data and simplify your workflows.

In this post, we explore how to optimize processing array data embedded within complex JSON structures using AWS Step Functions Distributed Map. You’ll learn how to use ItemsPointer to reduce the complexity of your state machine definitions, create more flexible workflow designs, and streamline your data processing pipelines—all without writing additional transformation code or AWS Lambda functions.

This post is part of a series of post about AWS Step Functions Distributed Map:

Use case: e-commerce product data enrichment

In this e-commerce use case example, you’ll build a sample application that demonstrates processing of product inventory data for an e-commerce application using AWS Step Functions Distributed Map. The application receives a JSON file from an upstream application containing an array of product information. The Step Functions workflow reads the JSON file containing product data from an S3 bucket and iterates over the array to enrich each product data in the array.

The following diagram presents the AWS Step Functions state machine.

JSON array processing workflow

JSON array processing workflow

The JSON array is processed using the following workflow:

  1. The state machine reads the product-updates.json file from an input S3 bucket. The file contains a JSON array of products.
  2. The Distributed Map state in the state machine, selects the JSON array node using ItemsPointer and iterates over the JSON array.
  3. For each of the items within the array, the state machine invokes a Lambda function for data enrichment. The Lambda function adds product stock and price information to the product data.
  4. The state machine saves the updated product data in an Amazon DynamoDB table.
  5. Finally, the state machine uploads the execution metadata into an output S3 bucket. See limits related to state machine executions and task executions.

MaxConcurrency can be configured to specify the number of child workflow executions in a Distributed Map that can run in parallel. If not specified, then Step Functions doesn’t limit concurrency and runs 10,000 parallel child workflow executions.

You can read a JSON file from a S3 bucket using ItemReader and its sub-fields. If the JSON file, from the S3 bucket, contains a nested object structure, you can select the specific node with your data set with an ItemsPointer. For example, the following input JSON file:

{
  "version": "2024.1",
  "timestamp": "2025-09-26T10:49:36.646197",
  "productUpdates": {
    "items": [
      {
        "productId": "PROD-001",
        "name": "Wireless Headphones",
        "price": 79.99,
        "stock": 150,
        "category": "Electronics"
      },
      {
        "productId": "PROD-002",
        "name": "Smart Watch",
        "category": "Electronics"
      },
      …
    ]
  }
}

The following JSONata-based workflow configuration extracts a nested list of products from productUpdates/items:

"ItemReader": {
   "Resource": "arn:aws:states:::s3:getObject",
   "ReaderConfig": {
      "InputType": "JSON",
      "ItemsPointer": "/productUpdates/items"
   },
   "Arguments": {
      "Bucket": "amzn-s3-demo-bucket",
      "Key": "updates/product-updates.json"
   }
}

For JSONPath-based workflow note that Arguments is replaced with Parameters:

"ItemReader": {
   "Resource": "arn:aws:states:::s3:getObject",
   "ReaderConfig": {
      "InputType": "JSON",
      "ItemsPointer": "/productUpdates/items"
   },
   "Arguments": {
      "Bucket": "amzn-s3-demo-bucket",
      "Key": "updates/product-updates.json"
   }
}

The ItemReader field is not needed when your dataset is JSON data from a previous step. ItemsPointer is only applicable when the input JSON objects read from an S3 bucket. If you are using JSON as state input to a Distributed Map, then you can use the ItemsPath (for JSONPath) or Items (for JSONata) field to specify a location in the input that points to JSON array or object used for iterations.

Prerequisite

To use Step Functions Distributed Map, verify you have:

Set up and run the workflow

Run the following steps to deploy the Step Functions state machine.

  1. Clone the GitHub repository in a new folder and navigate to the project folder.
    git clone https://github.com/aws-samples/sample-stepfunctions-json-array-processor.git
    cd sample-stepfunctions-json-array-processor

  2. Run the following commands to deploy the application.
    sam deploy --guided

    Enter the following details:

    • Stack name: Stack name for CloudFormation (for example, stepfunctions-json-array-processor)
    • AWS Region: A supported AWS Region (for example, us-east-1)
    • Accept all other default values.

    The outputs from the sam deploy will be used in the subsequent steps.

  3. Run the following command to generate product-updates.json file containing a nested JSON array of sample products and upload the product-updates.json file to the input S3 bucket. Replace InputBucketName with the value from sam deploy output.
    python3 scripts/generate_sample_data.py <InputBucketName>

  4. Run the following command to start execution of the Step Functions workflow. Replace the StateMachineArn with the value from sam deploy output.
    aws stepfunctions start-execution \
      --state-machine-arn <StateMachineArn> \
      --input '{}'

    The state machine reads the input product-updates.json file and invokes a Lambda function to update the database for every product in the array after adding price and stock information. The execution metadata is also uploaded into the results bucket.

Monitor and verify results

Run the following steps to monitor and verify the test results.

  1. Run the following command to get the details of the execution. Replace executionArn with your state machine ARN.
    aws stepfunctions describe-execution --execution-arn <executionArn>

    Wait until the status shows SUCCEEDED.

  2. Run the following commands to validate the processed output from ProductCatalogTableName DynamoDB table. Replace the value ProductCatalogTableName with the value from sam deploy output.
    aws dynamodb scan --table-name <ProductCatalogTableName>

  3. Check that the DynamoDB table contains the enriched product data including price and stock attributes. Example output:
    {
        "Items": [
            {
                "ProductId": {
                    "S": "PROD-005"
                },
                "lastUpdated": {
                    "S": "2025-10-07T20:33:34.507Z"
                },
                "stock": {
                    "N": "129"
                },
                "price": {
                    "N": "139.25"
                }
            },
            {
                "ProductId": {
                    "S": "PROD-003"
                },
                "lastUpdated": {
                    "S": "2025-10-07T20:33:34.576Z"
                },
                "stock": {
                    "N": "471"
                },
                "price": {
                    "N": "40.92"
                }
            },
    	      …
        ],
        "Count": 5,
        "ScannedCount": 5,
        "ConsumedCapacity": null
    }

Clean up

To avoid costs, remove all resources you’ve created while following along with this post.

Run the following command after replacing the <placeholder> variable to delete the resources you deployed for this post’s solution:

aws s3 rm s3://<InputBucketName> --recursive
aws s3 rm s3://<ResultBucketName> --recursive
sam delete

Conclusion

In this post, you learned how to use Step Functions Distributed Map for extracting array data natively from JSON objects stored in a S3 bucket. By removing custom data extraction code, you can simplify the processing of your large-scale parallel workloads. With ItemsPointer you can extract array data within JSON files stored in a S3 bucket , and with Items(JSONata) or ItemsPath (JSONPath), you can extract arrays from complex JSON state input, adding flexibility to your workflow designs.

New input sources for Distributed Map are available in all commercial AWS Regions where AWS Step Functions is available. For a complete list of AWS Regions where Step Functions is available, see the AWS Region Table. To get started, you can use the Distributed Map mode today in the AWS Step Functions console. To learn more, visit the Step Functions developer guide.

For more serverless learning resources, visit Serverless Land.

Processing Amazon S3 objects at scale with AWS Step Functions Distributed Map S3 prefix

Post Syndicated from Biswanath Mukherjee original https://aws.amazon.com/blogs/compute/processing-amazon-s3-objects-at-scale-with-aws-step-functions-distributed-map-s3-prefix/

If you’re building large scale enterprise applications, you’ve likely faced the complexities of processing large volumes of data files. Whether you’re analyzing your application logs, processing customer data files, or transforming machine learning datasets, you know the complexity involved in orchestrating workflows. You’ve probably written nested workflows and additional custom code to process objects from Amazon Simple Storage Service (Amazon S3) buckets.

With AWS Step Functions Distributed Map, you can process large scale datasets by running concurrent iterations of workflow steps across data entries in parallel, achieving massive scale with simplified management.

With the new prefix-based iteration feature and LOAD_AND_FLATTEN transformation parameter for Distributed Map, your workflows can now iterate over S3 objects under a specified prefix using S3ListObjectsV2 to process their contents in a single Map state, avoiding nested workflows and reducing operational complexity.

In this post, you’ll learn how to process Amazon S3 objects at scale with the new AWS Step Functions Distributed Map S3 prefix and transformation capabilities.

Use case: Application log processing and summarization

You’ll build a sample Step Functions state machine that demonstrates processing of all the log files from the given S3 prefix using a Distributed Map. You’ll analyze all the log files to build a summary INFO, WARNING and ERROR messages in the log file on hourly basis. The following diagram presents the AWS Step Functions state machine:

Log files processing workflow

Log files processing workflow

  1. The state machine iterates over all the log files from the specified S3 prefix using S3 ListObjectsV2 and process them using AWS Step Functions Distributed Map.
  2. For each log file entry, the state machine puts hourly ErrorCount metric into Amazon CloudWatch.
  3. The state machine then stores hourly metrics count in a Amazon DynamoDB table.
  4. The state machine then invokes an AWS Lambda function to perform metrics aggregation.

The following is an example of the parameters in an ItemReader configured to iterate over the content of S3 objects using S3 ListObjectsV2.

{
  "QueryLanguage": "JSONata",
  "States": {
    ...
    "Map": {
        ...
        "ItemReader": {
            "Resource": "arn:aws:states:::s3:listObjectsV2",
            "ReaderConfig": {
                // InputType is required if Transformation is LOAD_AND_FLATTEN. Use one of the given values
                "InputType": "CSV | JSON | JSONL | PARQUET",
                // Transformation is OPTIONAL and defaults to NONE if not present
                "Transformation": "NONE | LOAD_AND_FLATTEN" 
            },
            "Arguments": {
                "Bucket": "amzn-s3-demo-bucket1",
                "Prefix": "{% $states.input.PrefixKey %}"
            }
        },
        ...
    }
}

With the LOAD_AND_FLATTEN option, your state machine will do the following:

  1. Read the actual content of each object listed by the Amazon S3 ListObjectsV2 call.
  2. Parse the content based on InputType (CSV, JSON, JSONL, Parquet).
  3. Create items from the file contents (rows/records) rather than metadata.

We recommend including a trailing slash on your prefix. For example, if you select data with a prefix of folder1, your state machine will process both folder1/myData.csv and folder10/myData.csv. Using folder1/ will strictly process only one folder. All of the objects listed by prefix need to be in the same data format. For example, if you are selecting InputType as JSONL, your S3 prefix should contain only JSONL files and not a mix of other types.

The context object is an internal JSON structure that is available during an execution. The context object contains information about your state machine and execution. Your workflows can reference the context object in a JSONata expression with $states.context.

Within a Map state, the context object includes the following data:

"Map": {
   "Item": {
      "Index" : Number,
      "Key"   : "String", // Only valid for JSON objects
      "Value" : "String",
      "Source": "String"
   }
}

For each Map iteration, the Index contains the index number for the array item that is being currently processed.

A Key is only available when iterating over JSON objects. Value contains the array item being processed. For example, for the following input JSON object, Names will be assigned to Key and {"Bob", "Cat"} will be assigned to Value.

{
	"Names": {"Bob", "Cat"}
} 

Source contains one of the following:

  • For state input: STATE_DATA
  • For Amazon S3 LIST_OBJECTS_V2 with Transformation=NONE, the value will show the S3 URI for the bucket. For example: S3://amzn-s3-demo-bucket1
  • For all the other input types, the value will be the Amazon S3 URI. For example: S3://amzn-s3-demo-bucket1/object-key

Using LOAD_AND_FLATTEN and the Source field, you can connect child executions to their sources.

Prerequisites

Set up and run the workflow

Run the following steps to deploy and test the Step Functions state machine.

  1. Clone the GitHub repository in a new folder and navigate to the project folder.
    git clone https://github.com/aws-samples/sample-stepfunctions-s3-prefix-processor.git
    cd sample-stepfunctions-s3-prefix-processor

  2. Run the following commands to deploy the application.
    sam deploy --guided

  3. Enter the following details:
    • Stack name: Stack name for CloudFormation (for example, stepfunctions-s3-prefix-processor)
    • AWS Region: A supported AWS Region (for example, us-east-1)
    • Accept all other default values.

    The outputs from the AWS SAM deploy will be used in the subsequent steps.

  4. Run the following command to generate sample log files.
    python3 scripts/generate_logs.py

  5. Run the following to upload the log files to the S3 bucket with the /logs/daily prefix. Replace amzn-s3-demo-bucket1 with the value from the sam deploy output.
    aws s3 sync logs/ s3://amzn-s3-demo-bucket1/logs/ --exclude '*' --include '*.log'

  6. Run the following command to execute the Step Functions workflow. Replace the StateMachineArn with the value from the sam deploy output.
    aws stepfunctions start-execution \
      --state-machine-arn <StateMachineArn> \
      --input '{}'

    The Step Function state machine iterates over all the log files with the S3 prefix /logs/daily and processes them in parallel. The workflow updates the metrics in CloudWatch, stores hourly metrics count in DynamoDB, then invokes an AWS Lambda function to aggregate the metrics.

Monitor and verify results

Run the following steps to monitor and verify the test results.

  1. Run the following command to get the details of the execution. Replace executionArn with your state machine ARN.
    aws stepfunctions describe-execution --execution-arn <executionArn>

  2. When the status shows SUCCEEDED, run the following commands to check the processed output from the LogAnalyticsSummaryTableName DynamoDB table. Replace the value LogAnalyticsSummaryTableName with the value from the sam deploy output.
    aws dynamodb scan --table-name <LogAnalyticsSummaryTableName>

  3. Check that hourly ERROR, WARN, and INFO logs statistics are saved in the DynamoDB table. The following is a sample output:
    {
        "Items": [
            {
                "ProcessingTime": {
                    "S": "2025-10-07T23:45:10.790Z"
                },
                "WarningCount": {
                    "N": "2"
                },
                "HourOfDay": {
                    "S": "13"
                },
                "TotalRecords": {
                    "N": "5"
                },
                "ErrorCount": {
                    "N": "3"
                },
                "InfoCount": {
                    "N": "0"
                },
                "HourKey": {
                    "S": "2025-10-08 13"
                }
            },
            {
                "ProcessingTime": {
                    "S": "2025-10-07T23:45:07.456Z"
                },
                "WarningCount": {
                    "N": "3"
                },
                "HourOfDay": {
                    "S": "09"
                },
                "TotalRecords": {
                    "N": "6"
                },
                "ErrorCount": {
                    "N": "2"
                },
                "InfoCount": {
                    "N": "1"
                },
                "HourKey": {
                    "S": "2025-10-08 09"
                }
            },
            …
    ],
        "Count": 24,
        "ScannedCount": 24,
        "ConsumedCapacity": null
    }

  4. Run the following command to check the output of the Step Functions state machine execution output.
    aws stepfunctions describe-execution --execution-arn <executionArn> --query 'output' --output text

    The following is a sample output:

    {
      "Summary": {
        "date": "2025-10-08",
        "totalErrors": 50,
        "totalWarnings": 41,
        "totalRecords": 133,
        "hourlyBreakdown": {
          "00": {
            "errors": 1,
            "warnings": 3,
            "records": 5
          },
          "01": {
            "errors": 1,
            "warnings": 1,
            "records": 5
          },
          "02": {
            "errors": 2,
            "warnings": 3,
            "records": 5
          },
          "03": {
            "errors": 3,
            "warnings": 2,
            "records": 7
          },
    …
    …
        "generatedAt": "2025-10-08T05:19:05.603889"
      }
    }

    The output of the Step Functions state machine shows the daily summary insights of the log files created by the Lambda function.

Clean up

To avoid costs, remove all resources created for this post once you’re done. Run the following command after replacing amzn-s3-demo-bucket1 with your own bucket name to delete the resources you deployed for this post’s solution:

aws s3 rm s3://amzn-s3-demo-bucket1 --recursive
sam delete
rm -rf logs/

Conclusion

In this post, you learned how AWS Step Functions Distributed Map can use prefix-based iteration with LOAD_AND_FLATTEN transformation to read and process multiple data objects from Amazon S3 buckets directly. You no longer need one step to process object metadata and another to load the data objects. Loading and flatting in one step is particularly valuable for data processing pipelines, batch operations, and event-driven architectures where objects are continually added to S3 locations. By eliminating the need to maintain object manifests, you can build more resilient, dynamic data processing workflows with less code and fewer moving parts.

New input sources for Distributed Map are available in all commercial AWS Regions where AWS Step Functions is available. To get started, you can use the Distributed Map mode today in the AWS Step Functions console. To learn more, visit the Step Functions developer guide.

For more serverless learning resources, visit Serverless Land.

Breaking down monolith workflows: Modularizing AWS Step Functions workflows

Post Syndicated from Sahithi Ginjupalli original https://aws.amazon.com/blogs/compute/breaking-down-monolith-workflows-modularizing-aws-step-functions-workflows/

You can use AWS Step Functions to orchestrate complex business problems. However, as workflows grow and evolve, you can find yourself grappling with monolithic state machines that become increasingly difficult to maintain and update. In this post, we show you strategies for decomposing large Step Functions workflows into modular, maintainable components. We dive deep into architectural patterns like parent-child workflows, domain-based separation, and shared utilities that can help you break down complexity while maintaining business functionality. By implementing these decoupling techniques, you can achieve faster deployments, better error isolation, and reduced operational overhead – all while keeping your workflows scalable and efficient. Whether you’re dealing with payment processing, data transformation, or complex business logic, these patterns will help you build more resilient and manageable Step Functions applications.

The Complexity of Single-State Machine Architectures

While monolithic workflows can be suitable for simple, linear processes with limited states and clear dependencies, they become problematic when handling complex business logic across multiple domains. If your workflow involves more than 15-20 states, crosses multiple business domains, or requires frequent updates from different teams, it’s a strong indicator that you should consider a decomposed approach instead of a monolithic one. However, monolithic workflows remain a valid choice for scenarios with straightforward business logic, single-team ownership, infrequent changes, and workflows with less than 15 states – especially when rapid development and simplified debugging are priorities.

Let’s examine a real-world example of such a monolithic workflow and understand the challenges it presents for development teams, operational efficiency, and business agility. Below is an example of a state machine that is a mix of payment processes, inventory management, and notification mechanisms for an e-commerce implementation:

Figure 1: A state machine that is a mix of payment processes, inventory management, and notification mechanisms for an e-commerce implementation

Figure 1: A state machine that is a mix of payment processes, inventory management, and notification mechanisms for an e-commerce implementation

State Explosion

Modifying a single state in a monolithic workflow triggers a cascade of changes across multiple interconnected states due to their tight coupling and dependencies. For example, adding a new payment method would require modifications across various states including validation, processing, and error handling, creating a ripple effect throughout the workflow. This inter dependency makes even simple changes complex and risky, as altering one component can have unintended consequences on other parts of the workflow.

Looking at the provided architecture, we can see a clear example of state explosion where a single workflow handles multiple business processes including order validation, payment processing, inventory management, shipping calculations, and customer notifications. This creates a complex web of dependent states that becomes increasingly difficult to manage. The result is a “spider web” of states that becomes progressively harder to understand, debug, and maintain.

Version Management

Version management in monolithic workflows requires deploying the entire workflow even for minor changes to individual components, making it difficult to isolate and update specific business logic. The provided architecture demonstrates the version management challenge clearly. For instance, if the shipping calculation logic needs an urgent fix, the entire workflow must be redeployed, requiring comprehensive testing of all components, including unmodified ones, to ensure nothing breaks in the process.

Resource Limitations

While not immediately visible in the architecture diagram, Monolithic workflows face operational constraints as they grow in complexity, particularly with state transitions, maximum event payload size, and execution history size. Refer to the Service quotas documentation to understand such limits. These constraints become critical bottlenecks as workflows grow in complexity and handle increased transaction volumes. In the monolithic state machine, long-running operations like payment processing and shipping calculations, combined with multiple state transitions, could approach these limits, especially for high-volume scenarios.

Additionally, we also come across generic design challenges such as error handling. In monolithic approach, workflows leads to redundant try-catch blocks and retry configurations across different operations. This creates challenges in implementing distinct error strategies for different business scenarios and makes it difficult to maintain proper rollback mechanisms when failures occur in middle states.

These challenges collectively highlight the need for a more modular approach to Step Functions workflow design.

Transforming Complex Workflows Through Decomposition

When transforming complex monolithic workflows into more manageable components, you can employ several decomposition strategies to achieve better modularity and maintainability. These strategies include the parent-child pattern, which creates a hierarchical structure of workflows, domain separation that breaks down workflows based on business capabilities, shared utilities that serve as reusable components for common operations, and specialized error workflows for centralized error handling. Each of these strategies can be implemented either individually or in combination, depending on the specific requirements and complexity of the application, allowing organizations to create more efficient, scalable, and maintainable Step Functions workflows while ensuring proper separation of concerns and reduced operational overhead.

Parent-Child Pattern

The parent-child pattern represents a hierarchical approach to workflow organization where a main (parent) workflow orchestrates multiple sub-workflows (children). Parent workflows manage the overall business process and coordination, while child workflows handles specific, short-lived operations. This pattern is particularly effective when you need to balance between orchestration complexity and execution speed.

 Figure 2 : Decoupled parent state machine

Figure 2 : Decoupled parent state machine

The current monolithic workflow can be restructured by creating a parent workflow focused on order orchestration while delegating specific operations to Express child workflows. The parent workflow would maintain the high-level flow from order validation through completion, while time-sensitive operations like ValidateOrder, ProcessPayment, and ProcessShipping could become Express child workflows.

Kindly note that during an architecture re-vamp, Express workflows are different when compared to Standard Workflows. For example, you cannot have an Express parent workflow invoke a Standard child workflow. Also, an Express workflow does not support Job-run (.sync) or Callback (.waitForTaskToken) service integration patterns. You can refer to the differences between Standard and Express workflows documentation to choose the right architecture pattern.

For instance, the payment processing section could be transformed into a child Express workflow that handles all payment-related states (ProcessPayment, ProcessStandardPayment, ValidatePaymentPart) as a single unit. An express workflow is more suitable for sections like payment processing as their executions complete within 5 minutes. Hence this would also reduce the over-all cost of the workflow. Similarly, the shipping calculation logic involving CalculateExpressShipping and CalculateStandardShipping could be consolidated into another Express child workflow, leading to reduced costs and easier updates to shipping logic.

Domain Separation

Domain separation involves breaking down workflows based on distinct business capabilities or functional areas. Each domain-specific workflow becomes responsible for a complete business function, operating independently while communicating through well-defined interfaces. This approach aligns closely with micro-service architecture principles and domain-driven design. The architecture shows clear boundaries between different business domains that can be separated into independent workflows. Three primary domains emerge:

  1. Payment Domain: Encompassing ValidateOrder, ProcessPayment, ValidatePaymentPart, and related error handlers
  2. Inventory Domain: Including CheckInventory, UpdateInventory, and associated states
  3. Shipping Domain: Containing ProcessShipping, shipping calculations, and shipping status updates

Each domain would become its own workflow, with clear input/output contracts. This separation would allow specialized teams to maintain and deploy updates to their domain workflows independently ensuring implementation of domain-specific retry policies, error handling, and business rules without affecting other domains. For example, the payment team could enhance payment processing logic without impacting shipping or inventory operations.

 Figure 3 : Child state machine that handles payment workflow

Figure 3 : Child state machine that handles payment workflow

 Figure 4 : Child state machine that handles shipping workflow

Figure 4 : Child state machine that handles shipping workflow

Shared Utilities

Shared utility workflows serve as reusable components for common operations that appear across multiple business processes. These workflows encapsulate standard functionality like notification handling, data validation, logging, or audit trail creation. By centralizing these common operations, organizations can ensure consistency and reduce duplication across their Step Functions applications.

The current architecture repeats several common patterns that could be extracted into shared utility workflows. Most notably, the notification handling logic (SendEmail, SendSMSNotification, SendPushNotification) appears as a cluster of states at the end of the workflow. This could be consolidated into a single “NotificationManager” utility workflow that handles all types of notifications. These utility workflows could then be called from any other workflow in the system, ensuring consistent behavior and reducing code duplication.

 Figure 5 : Re-usable notification workflow

Figure 5 : Re-usable notification workflow

Error Workflows

Error workflows represent a specialized form of decomposition focused on centralizing error handling and recovery logic. Instead of embedding complex error handling in each business workflow, organizations can create dedicated workflows that manage different types of failures, retries, and compensation actions. This approach provides a consistent and maintainable way to handle errors across the application.

Each of these decomposition strategies can be used individually or in combination, depending on the specific needs of your application. For instance, utilization of Domain Separation pattern has resulted in streamlined error workflows. The key is to choose the appropriate combination that provides the right balance of maintainability, scalability, and operational efficiency for your use case. However, implementing all four strategies in a phased approach would provide the most comprehensive solution for long-term maintainability and scalability.

Results:

The comparison between monolithic and decoupled approaches reveals notable differences in performance and operational metrics. In order to emulate similar test environments across monolithic and decomposed state machines, we tested them in us-east-1 AWS region. We made use of the aforementioned workflows, both monolithic and decomposed to achieve a similar end goal. Pricing comparison is based on a monolithic workflow processing 11 state transitions per workflow across 30,000 monthly requests. For the decomposed approach, calculations assume Express child workflows configured with 64MB memory and 100ms execution duration, while the parent workflow handles 8 state transitions per workflow for the same volume of 30,000 monthly requests. AWS Lambda task states in both approaches complete within 2 seconds of execution time.

When decoupled, the workflows can be re-designed to use either Express or Standard mechanisms depending on their execution time and integration pattern requirements. In this use-case, we identified multiple workflows like PaymentProcessing, CalculateExpressShipping that are revamped to use Express workflows as per their requirement. While the monolithic approach shows a slightly faster execution duration of 11.5 seconds compared to 13 seconds in the decomposed approach, the monthly pricing significantly favors the decoupled architecture at $6.37 USD versus $11.90 USD for the monolithic approach. The decoupled approach demonstrates superior debugging capabilities with better error isolation and domain-specific debugging, contrasting with the monolithic approach’s complex debugging challenges due to heavy payloads and middle-state failure tracking. Additionally, the decoupled architecture benefits from smaller payload sizes through distributed data handling, whereas the monolithic workflow carries larger payloads across its 18 state transitions.

 

Monolithic Approach Decoupled Approach
Execution Duration 11.5 seconds for complete workflow execution 13 seconds with distributed processing across parent-child workflows
Monthly Pricing $11.90 USD (476,000 billable state transitions) $6.37 USD (Combined cost of Standard parent workflow and Express child workflows)
Debug Effort High – Complex debugging due to heavy payloads and difficulty in tracking failures in middle states. Cannot effectively use ResultSelector when final notification state needs all details. Lower – Easier to debug with isolated domains and smaller payloads. Better error isolation and domain-specific debugging.
Payload Size Larger payload size throughout workflow execution as all data needs to be carried across 18 state transitions Smaller payload size due to domain separation and distributed data handling across parent-child workflows

Conclusion

The need for decomposing Step Functions workflows becomes evident when you face challenges with monolithic workflows such as state explosion, version management complexities, and resource limitations. These challenges result in reduced operational efficiency, increased debugging complexity, and higher maintenance overhead as demonstrated by the comparison results showing differences in execution duration, pricing, debugging effort, and payload management. Organizations should evaluate workflow decomposition when they observe workflows exceeding 15-20 states, multiple team involvement in workflow maintenance, frequent independent updates across different business domains, complex error handling requirements, and the need for reusable components across workflows. The implementation of decomposition strategies through parent-child pattern for hierarchical workflow organization, domain separation for business capability isolation, shared utilities for common operations, and dedicated error workflows for centralized error handling has shown tangible benefits in terms of reduced costs, better error isolation, and more efficient payload management.

While implementing decomposition strategies, organizations must be cautious to avoid over-decomposition of workflows, maintaining tight coupling between workflows, and ignoring core design principles of loose coupling and single responsibility. This strategic approach to workflow decomposition ultimately leads to more maintainable, scalable, and cost-effective Step Functions applications that better serve business needs while reducing operational overhead. The transformation from monolithic to decomposed workflows represents a significant architectural improvement that enables organizations to better manage complex business processes while maintaining operational efficiency and system reliability.

How to export to Amazon S3 Tables by using AWS Step Functions Distributed Map

Post Syndicated from Chetan Makvana original https://aws.amazon.com/blogs/compute/how-to-export-to-amazon-s3-tables-by-using-aws-step-functions-distributed-map/

Companies running serverless workloads often need to perform extract, transform, and load (ETL) operations on data files stored in Amazon Simple Storage Service (Amazon S3) buckets. Though traditional approaches such as an AWS Lambda trigger for Amazon S3 or Amazon S3 Event Notifications can handle these operations, they might fall short when workflows require enhanced visibility, control, or human intervention. For example, some processes might need manual review of failed records or explicit approval before proceeding to subsequent stages. Customer orchestration solutions to these issues can prove to be complex and error prone.

AWS Step Functions address these challenges by providing built-in workflow management and monitoring capabilities. The Step Functions Distributed Map feature is designed for high-throughput, parallel data processing workflows so that companies can handle complex ETL jobs, fan-out processing, and data visualization at scale. Distributed Map handles each dataset item as an independent child workflow, processing millions of records while maintaining built-in concurrency controls, fault tolerance, and progress tracking. The processed data can be seamlessly exported to various destinations, including Amazon S3 Tables with Apache Iceberg support.

In this post, we show how to use Step Functions Distributed Map to process Amazon S3 objects and export results to Amazon S3 Tables, creating a scalable and maintainable data processing pipeline.

See the associated GitHub repository for detailed instructions about deploying this solution as well as sample code.

Solution overview

Consider a consumer electronics company that regularly participates in industry trade shows and conferences. During these events, interested attendees fill out paper sign-up forms to request product demos, receive newsletters, or join early access programs. After the events, the company’s team scans hundreds of thousands of these forms and uploads them to Amazon S3.Rather than manually reviewing each form, the company wants to automate the extraction of key customer details such as name, email address, mailing address, and interest areas. They’d like to store this structured data in S3 Tables with Apache Iceberg format for downstream analytics and marketing campaign targeting.

Let’s look at how this post’s solution uses Distributed Map to process PDFs in parallel, extract data using Amazon Textract, and write the cleaned output directly to S3 Tables. The result is scalable, serverless post-event data onboarding, as shown in the following figure.

Solution architecture for automated PDF processing workflow with S3 Tables, EventBridge scheduling, Step Functions Distributed Map

The data processing workflow as shown in the preceding diagram includes the following steps:

  1. A user uploads customer interest forms as scanned PDFs to an Amazon S3 bucket.
  2. An Amazon EventBridge Scheduler rule triggers at regular intervals, initiating a Step Functions workflow execution.
  3. The workflow execution activates a Step Functions Distributed Map state, which lists all PDF files uploaded to Amazon S3 since the previous run.
  4. The Distributed Map iterates over the list of objects and passes each object’s metadata (bucket, key, size, entity tag [ETag]) to a child workflow execution.
  5. For each object, the child workflow calls Amazon Textract with the provided bucket and key to extract raw text and relevant fields (name, email address, mailing address, interest area) from the PDF.
  6. The child workflow sends the extracted data to Amazon Data Firehose, which is configured to forward data to S3 Tables.
  7. Firehose batches the incoming data from the child workflow and writes it to S3 Tables at a preconfigured time interval of your choosing.

With data now structured and accessible in S3 Tables, users can easily analyze them using standard SQL queries with Amazon Athena or business intelligence like Amazon QuickSight.

The data-processing workflow

EventBridge Scheduler starts new Step Functions workflows at regular intervals. The timeline for this schedule is flexible. However, When setting up your schedule, make sure the frequency aligns with how far back your state machine is configured to look for PDFs. For example, if your state machine checks for PDFs from the past week, you’d want to schedule it to run weekly. The Step Functions workflow subsequently performs the following three steps (note that these steps are steps 4, 5, 6, and 7 in the preceding workflow diagram:

  1. Extract relevant user data from the PDFs.
  2. Send the extracted user data to Firehose.
  3. Write the data to S3 Tables in Apache Iceberg table format.

The following diagram illustrates this workflow.

Screenshot of AWS Step Function workflow execution showing processing pipeline from S3 ingenstion through Kinesis batch output

Let’s look at each step of the preceding workflow in more detail.

Extract relevant user data from PDF documents

Step Functions uses Distributed Map to process PDFs concurrently in parallel child workflows. It accepts input from JSON, JSONL, CSV, Parquet files, Amazon S3 manifest files stored in Amazon S3 (used to specify particular files for processing), or an Amazon S3 bucket prefix (allows iteration over file metadata for all objects under that prefix). The Step Functions automatically handles parallelization by splitting the dataset and running child workflows for each item, with the ItemBatcher field allowing to group multiple PDFs into a single child workflow execution (e.g., 10 PDFs per batch) to optimize performance and cost.

The following screenshot of the Step Functions console shows the configuration for Distributed Map. For example, we have configured Distributed Map to process 10 customer interest PDFs in a single child workflow.

A screenshot of AWS Step Functions console showing Distributed Map state configuration

The following image shows one example of these scanned PDFs, which includes the customer information that this post’s solution processes.

A screenshot showing sample PDF

Each child workflow then calls the Amazon Textract AnalyzeDocument API with specific queries to extract customer information.

{
  "Document": {
    "S3Object": {
      "Bucket": "<input PDFs bucket>",
      "Name": "{% $states.input.Key %}"
    }
  },
  "FeatureTypes": [
    "QUERIES"
  ],
  "QueriesConfig": {
    "Queries": [
      {
        "Alias": "full_name",
        "Text": "What is the customer's name?"
      },
      {
        "Alias": "phone_number",
        "Text": "What is the customer’s phone number?"
      },
      {
        "Alias": "mailing_address",
        "Text": "What is the customer’s mailing address?"
      },
      {
        "Alias": "interest",
        "Text": "What is the customer’s interest?"
      }
    ]
  }
}

The API analyzes each scanned PDF and returns a JSON structure containing the extracted customer information.

Send the extracted user data to Firehose

The child workflow then uses a Firehose PutRecordBatch API action with service integrations to queue the extracted customer information for further processing. The PutRecordBatch action request includes the Firehose stream name and the data records. The data records include a data blob from step 1 that contains extracted customer information, as shown in the following example.

{
  "DeliveryStreamName": "put_raw_form_data_100",
  "Records": [
    {
      "Data": "{\"full_name\":\"Anthony Ayala\",\"phone_number\":\"001-384-925-0701\",\"mailing_address\":\"38548 Joshua Wall Suite 974, East Heatherfort, OH 32669\",\"interest\":\"Fitness Trackers\",\"processed_date\":\"2025-05-01\"}"
    },
    {
      "Data": "{\"full_name\":\"Becky Williams\",\"phone_number\":\"+1-283-499-2466\",\"mailing_address\":\"227 King Forge Suite 241, East Nathanland, PR 05687\",\"interest\":\"Al Assistants\",\"processed_date\":\"2025-05-01\"}"
    }
  ]
}

Write the data to S3 Tables in Apache Iceberg table format

Firehose efficiently manages data buffering, format conversion, and reliable delivery to various destinations, including Apache Iceberg, raw files in Amazon S3, Amazon OpenSearch Service, or any of the other supported destinations. Apache Iceberg tables can be either self-managed in Amazon S3 or hosted in S3 Tables. Though self-managed Iceberg tables require manual optimization—such as compaction and snapshot expiration—S3 Tables automatically optimize storage for large-scale analytics workloads, improving query performance and reducing storage costs.

Firehose simplifies the process of streaming data by configuring a delivery stream, selecting a data source, and setting an Iceberg table as the destination. After you’ve set it up, the Firehose stream is ready to deliver data. The delivered data can be queried from S3 Tables by using Athena, as shown in the following screenshot of the Athena console.

A screenshot of the Athena console showing a query to select the data we just uploaded

The query results include all processed customer data from the PDFs, as shown in the following screenshot.

A screenshot of the Athena console showing the results of the query we just ran

This integration demonstrates a powerful, code-free solution for transforming raw PDF forms into enriched, queryable data in an Iceberg table. You can use these data for further analysis.

Conclusion

In this post, we showed how to build a scalable, serverless solution for processing PDF documents and exporting the extracted data to S3 Tables by using Step Functions Distributed Map. This architecture offers several key benefits such as reliability, cost-effectiveness, visibility, and maintainability. By leveraging AWS services such as Step Functions, Amazon Textract, Firehose, and S3 Tables, companies can automate their document processing workflows while ensuring optimal performance and operational excellence. This solution can be adapted for various use cases beyond customer interest forms, such as invoice processing, application forms, or any scenario requiring structured data extraction from documents at scale.

Though this example focuses on processing PDF data and writing to S3 Tables, Distributed Map can handle various input sources including JSON, JSONL, CSV, and Parquet files in Amazon S3; items in Amazon DynamoDB tables; Athena query results; and all paginated AWS List APIs. Similarly, through Step Functions service integrations, you can write results to multiple destinations such as DynamoDB tables by using the PutItem service integration.

To get started with this solution, see the associated GitHub repository for deployment instructions and sample code.

A scalable, elastic database and search solution for 1B+ vectors built on LanceDB and Amazon S3

Post Syndicated from Audra Devoto original https://aws.amazon.com/blogs/architecture/a-scalable-elastic-database-and-search-solution-for-1b-vectors-built-on-lancedb-and-amazon-s3/

This post was co-authored with Owen Janson, Audra Devoto, and Christopher Brown of Metagenomi.

From CRISPR gene editing to industrial biocatalysis, enzymes power some of the most transformative technologies in healthcare, energy, and manufacturing. But discovering novel enzymes that can transform an industry — such as Cas9 for genome engineering — requires sifting through the billions of diverse enzymes encoded by organisms spanning the tree of life. Advances in DNA sequencing and metagenomics have enabled the growth of vast public and proprietary databases containing known protein sequences, but scanning through these collections to identify high value candidates is fundamentally a big data problem as well as a biological one.

At Metagenomi, we’re developing potentially curative therapeutics by using our extensive metagenomics database (MGXdb) to build a toolbox of novel gene editing systems. In this post, we highlight how Metagenomi is tackling the challenge of enzyme discovery at the billion protein scale by using the scalable infrastructure of Amazon Web Services (AWS) to build a high-performance protein database and search solution based on embeddings. By embedding every protein in our large proprietary database into a vector space, making the data accessible using LanceDB built on Amazon Simple Storage Service (Amazon S3), and accessed with AWS Lambda, we were able to transform enzyme discovery into a nearest neighbor search problem and rapidly access previously unexplored discovery space.

Solution overview

At the core of our solution is LanceDB. LanceDB is an open source vector database that enables rapid approximate nearest neighbor (ANN) searches on indexed vectors. LanceDB is particularly well suited for a serverless stack because it’s entirely file-based and is also compatible with Amazon S3 storage. As a result, we can store our database of embedded protein sequences on relatively low-cost Amazon S3, rather than a persistent disk storage such as Amazon Elastic Block Store (Amazon EBS). Instead of constantly running servers, all that is needed to rapidly query the database on-demand is a Lambda function that uses LanceDB to find nearest neighbors directly from the data on S3.

To overcome the challenge of ingesting and querying billions of vector embeddings representing Metagenomi’s large protein database, we devised a method for splitting the database into equal sized parts (folders) stored for low cost on Amazon S3 that can be indexed in parallel and searched with a map-reduce approach using Lambda. The following diagram illustrates this architecture.

AWS architecture showing protein vector processing workflow with ECR, Lambda, and LanceDB

The process follows four steps:

  1. Data vectorization
  2. Data bucketing
  3. Indexing and ingesting data
  4. Querying the database

Data vectorization

To make use of LanceDB’s fast ANN search capabilities, the data must be in vector form. Our metagenomics database consists of billions of proteins, each a string of amino acids. To convert each protein into a vector that captures biologically meaningful information, we run them through a protein language model (pLM), capturing the model’s hidden layers as a vector representation of that protein. Many pLMs can be used to generate protein embeddings, depending on the desired biological information and computational requirements. Here, we use the AMPLIFY_350M model, a transformer encoder model that is fast enough to scale to our entire protein database. We perform a mean-pool of the final hidden layer of the model to produce a 960-dimension vector for each protein. These vectors and their respective unique protein IDs are then stored in HDF5 files.

Data bucketing

To turn our protein vectors into a searchable database, we use LanceDB to build an index suitable for quickly finding ANNs to a query. However, indexing can take a long time and is difficult to distribute across nodes. To speed up indexing, we first divide our data into roughly evenly sized buckets. We then assign each of our embedding HDF5 files to buckets of size roughly equal to 200 million total vectors using a best-fit bin packing algorithm. The exact size packing method used to bucket data depends on the number and dimension of the vectors, as well as their format. Each bucket is ingested into a separate table that will separately reside in a single LanceDB database object store on Amazon S3.

S3 bucket structure showing LanceDB database organization with vector buckets

By bucketing our data, we can produce several smaller databases that can be indexed on separate nodes in a much shorter amount of time. We can also add more data to our database incrementally as a new bucket, instead of reindexing all the existing data.

Ingesting and indexing bucketed data

After the vectorized data has been assigned to a bucket, it’s time to turn it into a LanceDB table and index it to enable fast ANN querying. The details on how to convert your specific data into a LanceDB table can be found in the LanceDB documentation. For each of our buckets of approximately 200 million vectors, we create a LanceDB table with an IVF-PQ index on the cosine distance. For indexing, we use several partitions equal to the square root of the number of inserted rows, and several sub vectors equal to the number of dimensions of our vectors divided by 16.

To make things smoother to query, we name each table after the bucket from which it was created and upload them to a single S3 directory such that their file structure indicates a single LanceDB database with multiple tables.

The following code snippet provides an example of how you might ingest vectors from an HDF5 file containing id and embedding columns into a LanceDB database and index for fast ANN searches based on cosine distance. The only requirements for running this snippet are python >= 3.9, as well as the lancedb, pyarrow, and h5py packages. It should be noted that this snippet was tested and developed using lancedb version 0.21.1 using the asynchronous LanceDB API.

from typing import List, Iterable
from itertools import islice
from math import sqrt
import pyarrow as pa
import datetime
import asyncio
import lancedb
import h5py

def batched(iterable: Iterable, n: int) -> Iterable[List]:
    """Yield batches of n items from iterable."""
    while batch := list(islice(iterable, n)):
        yield batch

async def vectors_to_db(
    vectors: str,
    db: str,
    table_name: str,
    vector_dim: int,
    ingestion_batch_size: int,
) -> int:
    """Ingest and index vectors from an HDF5 file into a LanceDB table.
    Args:
        vectors (str): An HDF5 file containing protein IDs and their
            960-dimension vector representations.
        db (str): Path to the LanceDB database.
        table_name (str): Name of the table to create.
        vector_dim (int): Dimension of the vectors.
    """
    # create db and table
    custom_schema = pa.schema(
        [
            pa.field("embedding", pa.list_(pa.float32(), vector_dim)),
            pa.field("id", pa.string()),
        ]
    )

    # count the total number of rows as they are added to the table
    total_rows = 0

    # open a connection to the new database and create a table
    with await lancedb.connect_async(db) as db_connection:
        with await db_connection.create_table(
            table_name, schema=custom_schema
        ) as table_connection:
            # open vectors file
            with h5py.File(vectors, "r") as vectors_handle:
                # create a generator over the rows
                rows = (
                    {"embedding": e, "id": i}
                    for e, i in zip(
                        vectors_handle["embedding"],
                        vectors_handle["id"],
                    )
                )

                # insert rows in batches to avoid memory issues
                for batch in batched(rows, ingestion_batch_size):
                    total_rows += len(batch)
                    await table_connection.add(batch)

            # optimize the table and remove old data
            await table_connection.optimize(
                cleanup_older_than=datetime.timedelta(days=0)
            )

            # configure the index for the table
            index_config = lancedb.index.IvfPq(
                distance_type="cosine",
                num_partitions=int(sqrt(total_rows)),
                num_sub_vectors=int(
                    vector_dim / 16
                ),
            )

            # index the table
            await table_connection.create_index(
                "embedding", config=index_config
            )

# ingest and index your data
asyncio.run(
    vectors_to_db(
        vectors="./my_vectors.h5",
        db="./test_db",
        table_name="bucket1",
        vector_dim=960,
        ingestion_batch_size=50000
    )
)

The task of vectorizing, ingesting, indexing each bucket could be parallelized over multiple AWS Batch jobs or run on a single Amazon Elastic Compute Cloud (Amazon EC2) instance.

Querying the database

After the data has been bucketed and ingested into a LanceDB database on Amazon S3, we need a way to query it. Because LanceDB can be queried directly from Amazon S3 using the LanceDB Python API, we can use Lambda functions to take a user-provided query vector and search for ANNs, then return the data to the user. However, because our data has been bucketed across several tables in the database, we need to search for nearest neighbors in each bucket and aggregate the results before passing them back to the user.

We implement the query workflow as an AWS Step Functions state machine that manages a query process for each bucket as Lambda processes, as well as a single Lambda process at the end that aggregates the data and writes the resulting ANNs to a .csv file on Amazon S3. However, this could also be implemented as a series of AWS Batch processes or even run locally. The following snippet shows how a process assigned to one bucket could run an ANN query against one of the database’s buckets, requiring only pandas and lancedb to run on python >= 3.9. As detailed before in the ingestion section, we use the asynchronous LanceDB API and lancedb package version 0.21.1.

from typing import List, Iterable
import asyncio
import lancedb
import pandas
import random

async def run_query_async(
    lancedb_s3_uri: str,
    table_name: str,
    q_vec: List[float],
    k: int,
    vec_col: str,
    n_probes: int,
    refine_factor: int,
) -> pandas.DataFrame:
    """Run a query on a LanceDB table.
    Args:
        lancedb_s3_uri (str): S3 URI of the LanceDB database.
        table_name (str): Name of the table to query.
        q_vec (List[float]): Query vector.
        k (int): Number of nearest neighbors to return.
        vec_col (str): Column name of the vector column.
        n_probes (int): Number of probes to use for the query.
        refine_factor (int): Refine factor for the query.
    Returns:
        pandas.DataFrame: DataFrame containing the approximate nearest
        neighbors to the query vector.
    """
    # open a connection to the database and table
    with await lancedb.connect_async(
        lancedb_s3_uri, storage_options={"timeout": "120s"}
    ) as db_connection:
        with await db_connection.open_table(table_name) as table_connection:
            # query the approximate nearest neighbors to the query vector
            df = (
                await table_connection.query()
                .nearest_to(q_vec)
                .column(vec_col)
                .nprobes(n_probes)
                .refine_factor(refine_factor)
                .limit(k)
                .distance_type("cosine")
                .to_pandas()
            )

    return df

# query the example bucket we produced in the last section
bucket1_df = asyncio.run(
    snippets.run_query_async(
        lancedb_s3_uri="s3://mg-analysis/owen/20250415_lancedb_snippet_testing/test_db/",
        table_name="bucket1",
        q_vec=[random.random() for _ in range(960)],
        k=3,
        vec_col="embedding",
        n_probes=1,
        refine_factor=1,
    )
)

The preceding query will return a panda DataFrame of the following structure:

embedding id _distance
[-5.124435, 4.242000, …] id_1 0.000000
[-5.783999, 4.340500, …] id_2 0.001000
[-6.932943, 3.394850, …] id_3 0.04020

Where the embedding column contains the vector representations of the nearest neighbors, the id column their IDs, and the _distance column their cosine distances to the queried vector.

After each bucket has been independently queried across nodes and each has returned a nearest neighbors DataFrame, the results must be merged and subset to return the user. The following snippet shows how you might do this.

def aggregate_nearest_neighbors(
    dfs: List[pandas.DataFrame], k: int
):
    """Aggregate the nearest neighbors for each query vector.
    Args:
        dfs (List[pandas.DataFrame]): A list of DataFrames containing the
            nearest neighbors queried from each bucket.
        k (int): The number of nearest neighbors to aggregate.
    Returns:
        pd.DataFrame: A DataFrame with the aggregated nearest neighbors.
    """
    # concatenate the DataFrames and get the top k nearest neighbors
    return (
        pandas.concat(dfs, ignore_index=True)
        .sort_values(by=["_distance"], ascending=True)
        .reset_index(drop=True)
        .head(k)
    )

# add the dataframes from querying each bucket to a list
dfs = [bucket1_df, bucket2_df, bucket3_df, bucket4_df, bucket_5]

# aggregate the nearest neighbors across all buckets
nearest_neighbors_all_buckets_df = aggregate_nearest_neighbors(dfs, 5)

Optimizing for large batches of queries

Though querying a LanceDB database directly from its S3 object store on Lambda works well for querying the ANNs of one or a few query vectors, some use cases might require querying thousands or even millions of vectors.

One solution we’ve found that scales well to large batches of queries is to modify the preceding query implementation such that it first downloads one of the database buckets to local storage, then queries it locally using the LanceDB API. Because database buckets can have a large storage footprint, this implementation is better suited for AWS Batch jobs than Lambda, and we recommend using optimized instance storage (for example, i4i instances) rather than EBS volumes. After all query Batch jobs finish, a final job can aggregate their results before returning to the user. Orchestration of parallel query jobs and the aggregation job can be done with Nextflow. Though this implementation will have significantly more overhead and latency from downloading the buckets to disk, it can handle larger batches of queries more efficiently and still requires no continuously running server-based database.

Benchmarking results

Indexing strategies and database split sizes depend on your personal need for performance. Consider the following general optimization guidance when customizing to your use case.

An example database created by Metagenomi consisted of 3.5 billion vector embeddings produced by AMPLIFY, of dimension 960. Ingesting and indexing these 3.5B vector embeddings in split sizes of 200M vectors on i4i.8xlarge instances took 108 total compute hours. Because this solution is serverless and can be queried directly from its S3 object store, the only fixed cost of this database is its storage footprint on Amazon S3 (for an indexed database of 3.5B vectors, this is approximately 12.9 TB). Lambda queries can be an exceptionally low-cost querying solution, with many queries costing fractions of a cent.

In general, larger database splits will be more cost effective to query but will result in longer runtimes and longer indexing times. We recommend scaling up database split sizes to the maximum size that results in an acceptable query return time for a single split while also considering limits of parallelization such as maximum concurrent Lambda functions running. Metagenomi identified database splits of 200M vectors each to yield an optimal trade-off in cost and runtime for both small and large queries. We recommend ingesting and indexing on storage-optimized instances, such as those in the i4i family, for optimal performance and cost savings. If querying is to be done on an instance using a disk-based database (as opposed to Lambda and Amazon S3), we also recommend using storage-optimized instances for queries. We found the Lambda implementation could quickly handle single queries requesting up to 50,000 ANNs, or multi queries of up to 100 sequences with fewer than 5 ANNs. Runtime increases linearly with the number of ANNs requested, as shown in the following graph.

Line graph showing query runtime increasing with number of nearest neighbors

Conclusion

In this post, we showed how Metagenomi was able to store and query billions of protein embeddings at low cost using LanceDB implemented with Amazon S3 and AWS Lambda. This work expands on Metagenomi’s patient-driven mission to create curative genetic medicines by accelerating our discovery and engineering platform. Having quick access to the ANN embedding space of a query protein in seconds has enabled the integration of rapid search methods in our extensive analysis pipelines, accelerated the discovery of several diverse and novel enzyme families, and enabled protein engineering efforts by providing scientists with methods to generate and search embeddings on the fly. As Metagenomi continues to rapidly scale protein and DNA databases, horizontal scaling enabled by database splits that can be indexed and searched in parallel facilitates an embedding database solution that scales to future needs.

The solution outlined in this post focuses on vectors produced by a protein large language model (LLM) but can be applied to other vectorized datasets. To learn more about LanceDB integrated with Amazon S3, refer to the LanceDB documentation.

References

  1. Fournier, Quentin, et al. “Protein language models: is scaling necessary?.” bioRxiv (2024): 2024-09.

About the authors

AWS Weekly Roundup: Amazon Q Developer, AWS Step Functions, AWS Cloud Club Captain deadline, and more (September 22, 2025)

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-amazon-q-developer-aws-step-functions-aws-cloud-club-captain-deadline-and-more-september-22-2025/

Three weeks ago, I published a post about the new AWS Region in New Zealand (ap-southeast-6). This led to an incredible opportunity to visit New Zealand, where I met passionate builders and presented at several events including Serverless and Platform Engineering meetup, AWS Tools and Programming meetup, AWS Cloud Clubs in Auckland, and AWS Community Day New Zealand.

During my content creation process for these presentations, I discovered a useful feature in Amazon Q CLI called tangent mode. This feature has transformed how I stay focused by creating conversation checkpoints that let you explore side topics without losing your main thread.

This feature is in experimental mode, and you can enable it with q settings chat.enableTangentMode true. Try it out and see if it helps you.

Last week’s launches
Here are some launches that got my attention:

  • New Foundation Models in Amazon Bedrock — Amazon Bedrock expands its model selection with Qwen model family, DeepSeek-V3.1, and Stability AI image services now generally available, giving developers access to powerful multilingual models and advanced image generation capabilities for text generation, code generation, image creation, and complex problem-solving tasks.
  • Amazon VPC Reachability Analyzer Expands to Seven New Regions — Network Access Analyzer capabilities are now available in additional regions, helping customers analyze and troubleshoot network connectivity issues across their VPC infrastructure with improved global coverage.
  • Amazon Q Developer Supports Remote MCP Servers — Amazon Q Developer now integrates with remote Model Context Protocol (MCP) servers, enabling developers to extend their AI assistant capabilities with custom tools and data sources for enhanced development workflows.
  • AWS Step Functions Enhances Distributed Map with New Data Source Options — Step Functions introduces additional data source options and improved observability features for Distributed Map, making it easier to process large-scale parallel workloads with better monitoring and debugging capabilities.
  • Amazon Corretto 25 Generally Available — Amazon’s no-cost, multiplatform distribution of OpenJDK 25 is now generally available, providing Java developers with long-term support, performance enhancements, and security updates for building modern applications.
  • Amazon SageMaker HyperPod Introduces Autoscaling — SageMaker HyperPod now supports automatic scaling capabilities, allowing machine learning teams to dynamically adjust compute resources based on workload demands, optimizing both performance and cost for distributed training jobs.

Additional Updates

  • AWS Named Leader in 2025 Gartner Magic Quadrant for AI Code Assistants – AWS has been recognized as a Leader in Gartner’s Magic Quadrant for AI Code Assistants, highlighting Amazon Q Developer’s capabilities in helping developers write code faster and more securely with AI-powered suggestions.
  • Become an AWS Cloud Club Captain – Only a couple of days before it closes! Join a growing network of student cloud enthusiasts by becoming an AWS Cloud Club Captain! As a Captain, you’ll get to organize events and build cloud communities while developing leadership skills. The application window is open September 1-28, 2025.

Upcoming AWS events
Check your calendars and sign up for these upcoming AWS events as well as AWS re:Invent and AWS Summits:

  • AWS AI Agent Global Hackathon – This is your chance to dive deep into our powerful generative AI stack and create something truly awesome. From September 8th to October 20th, you have the opportunity to create AI agents using AWS suite of AI services, competing for over $45,000 in prizes and exclusive go-to-market opportunities.
  • AWS Gen AI Lofts – You can learn AWS AI products and services with exclusive sessions and meet industry-leading experts, and have valuable networking opportunities with investors and peers. Register in your nearest city: Mexico City (September 30–October 2), Paris (October 7–21), London (Oct 13–21), and Tel Aviv (November 11–19).
  • AWS Community Days – Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: South Africa (September 20), Bolivia (September 20), Portugal (September 27), and Manila (October 4-5).

You can browse all upcoming AWS events and AWS startup events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

Happy building!

— Donnie

Automate and orchestrate Amazon EMR jobs using AWS Step Functions and Amazon EventBridge

Post Syndicated from Senthil Kamala Rathinam original https://aws.amazon.com/blogs/big-data/automate-and-orchestrate-amazon-emr-jobs-using-aws-step-functions-and-amazon-eventbridge/

Many enterprises are adopting Apache Spark for scalable data processing tasks such as extract, transform, and load (ETL), batch analytics, and data enrichment. As data pipelines evolve, the need for flexible and cost-efficient execution environments that support automation, governance, and performance at scale also evolve in parallel. Amazon EMR provides a powerful environment to run Spark workloads, and depending on workload characteristics and compliance requirements, teams can choose between fully managed options like Amazon EMR Serverless or more customizable configurations using Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2).

In use cases where infrastructure control, data locality, or strict security postures are essential, such as in financial services, healthcare, or government, running transient EMR on EC2 clusters becomes a preferred choice. However, orchestrating the full lifecycle of these clusters, from provisioning to job submission and eventual teardown, can introduce operational overhead and risk if done manually.

To streamline this process, the AWS Cloud offers built-in orchestration capabilities using AWS Step Functions and Amazon EventBridge. Together, these services help you automate and schedule the entire EMR job lifecycle, reducing manual intervention while optimizing cost and compliance. Step Functions provides the workflow logic to manage cluster creation, Spark job execution, and cluster termination, and EventBridge schedules these workflows based on business or operational needs.

In this post, we discuss how to build a fully automated, scheduled Spark processing pipeline using Amazon EMR on EC2, orchestrated with Step Functions and triggered by EventBridge. We walk through how to deploy this solution using AWS CloudFormation, processes COVID-19 public dataset data in Amazon Simple Storage Service (Amazon S3), and store the aggregated results in Amazon S3. This architecture is ideal for periodic or scheduled batch processing scenarios where infrastructure control, auditability, and cost-efficiency are critical.

Solution overview

This solution uses the publicly available COVID-19 dataset to illustrate how to build a modular, scheduled architecture for scalable and cost-efficient batch processing for time-bound data workloads.The solution follows these steps:

  1. Raw COVID-19 data in CSV format is stored in an S3 input bucket.
  2. A scheduled rule in EventBridge triggers a Step Functions workflow.
  3. The Step Functions workflow provisions a transient Amazon EMR cluster using EC2 instances.
  4. A PySpark job is submitted to the cluster to calculate COVID-19 hospital utilization data to compute monthly state-level averages of inpatient and ICU bed utilization, and COVID-19 patient percentages.
  5. The processed results are written back to an S3 output bucket.
  6. After successful job completion, the EMR cluster is automatically deleted.
  7. Logs are persisted to Amazon S3 for observability and troubleshooting.

By automating this workflow, you alleviate the need to manually manage EMR clusters while gaining cost-efficiency by running compute only when needed. This architecture is ideal for periodic Spark jobs such as ETL pipelines, regulatory reporting, and batch analytics, especially when control, compliance, and customization are required.The following diagram illustrates the architecture for this use case.

The infrastructure is deployed using AWS CloudFormation to provide consistency and repeatability. AWS Identity and Access Management (IAM) roles grant least‑privilege access to Step Functions, Amazon EMR, EC2 instances, and S3 buckets, and optional AWS Key Management Service (AWS KMS) encryption can secure data at rest in Amazon S3 and Amazon CloudWatch Logs. By combining a scheduled trigger, stateful orchestration, and centralized logging, this solution delivers a fully automated, cost‑optimized, and secure way to run transient Spark workloads in production.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Set up resources with AWS CloudFormation

To provision the required resources using a single CloudFormation template, complete the following steps:

  1. Sign in to the AWS Management Console as an admin user.
  2. Clone the sample repository to your local machine or AWS CloudShell and navigate into the project directory.
    git clone https://github.com/aws-samples/sample-emr-transient-cluster-step-functions-eventbridge.git
    cd sample-emr-transient-cluster-step-functions-eventbridge

  3. Set an environment variable for the AWS Region where you plan to deploy the resources. Replace the placeholder with your Region code, for example, us-east-1.
    export AWS_REGION=<YOUR AWS REGION>

  4. Deploy the stack using the following command. Update the stack name if needed. In this example, the stack is created with the name covid19-analysis.
    aws cloudformation deploy \
    --template-file emr_transient_cluster_step_functions_eventbridge.yaml \
    --stack-name covid19-analysis \
    --capabilities CAPABILITY_IAM \
    --region $AWS_REGION 

You can monitor the stack creation progress on the AWS CloudFormation console on the Events tab. The deployment typically completes in under 5 minutes.

After the stack is successfully created, go to the Outputs tab on the AWS CloudFormation console and note the following values for use in later steps:

  • InputBucketName
  • OutputBucketName
  • LogBucketName

Set up the COVID-19 dataset

With your infrastructure in place, complete the following steps to set up the input data:

  1. Download the COVID-19 data CSV file from HealthData.gov to your local machine.
  2. Rename the downloaded file to covid19-dataset.csv.
  3. Upload the renamed file to your S3 input bucket under the raw/ folder path.

Set up the PySpark Script

Complete the following steps to set up the PySpark script:

  1. Open AWS CloudShell from the console.
  2. Confirm that you are working inside the sample-emr-transient-cluster-step-functions-eventbridge directory before running the next command.
  3. Copy the PySpark script needed for this walkthrough into your input bucket:
    aws s3 cp covid19_processor.py s3://<InputBucketName>/scripts/

This script processes COVID-19 hospital utilization data stored as CSV files in your S3 input bucket. When running the job, provide the following command-line arguments:

  • --input – The S3 path to the input CSV files
  • --output – The S3 path to store the processed results

The script reads the raw dataset, standardizes various date formats, and filters out records with invalid or missing dates. It then extracts key utilization metrics such as inpatient bed usage, ICU bed usage, and the percentage of beds occupied by COVID-19 patients and calculates monthly averages grouped by state. The aggregated output is saved as timestamped CSV files in the specified S3 location.

This example demonstrates how you can use PySpark to efficiently clean, transform, and analyze large-scale healthcare data to gain actionable insights on hospital capacity trends during the pandemic.

Configure a schedule in EventBridge

The Step Functions state machine is by default scheduled to run on December 31, 2025, as a one-time execution. You can update the schedule for recurring or one-time execution as needed. Complete the following steps:

  1. On the EventBridge console, choose Schedules under Scheduler in the navigation pane.
  2. Select the schedule named <StackName>-covid19-analysis and choose Edit.
  3. Set your preferred schedule pattern.
    1. If you want to run the schedule one time, select One-time schedule for Occurrence and enter a date and time.
    2. If you want to run this on a recurring basis, select Recurring schedule. Specify the schedule type as either Cron-based schedule or Rate-based schedule as needed.
  4. Choose Next twice and choose Save schedule.

Start the workflow in Step Functions

Based on your EventBridge schedule, the Step Functions workflow will run automatically. For this walkthrough, complete the following steps to trigger it manually:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the state machine that begins with Covid19AnalysisStateMachine-*.
  3. Choose Start execution.
  4. In the Input section, provide the following JSON (provide the log bucket and output bucket names with the appropriate values captured earlier):
    {
      "LogUri": "s3://<LogBucketName>/logs/",
      "OutputS3Location": "s3://<OutputBucketName>/processed/"
    }

  5. Choose Start execution to initiate the workflow.

Monitor the EMR job and workflow execution

After you start the workflow, you can track both the Step Functions state transitions and the EMR job progress in real time on the console.

Monitor the Step Functions state machine

Complete the following steps to monitor the Step Functions state machine:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the state machine that begins with Covid19AnalysisStateMachine-*.
  3. Choose the running execution to view the visual workflow.

    Each state node will update as it progresses—green for success, red for failure.

  4. To explore a step, choose its node and inspect the input, output, and error details in the side pane.

The following screenshot shows an example of a successfully executed workflow.

Monitor the EMR cluster and EMR step

Complete the following steps to monitor the EMR cluster and EMR step status:

  1. While the cluster is active, open the Amazon EMR console and choose Clusters in the navigation pane.
  2. Locate the Covid19Cluster transient EMR cluster.
    Initially, it will be in Starting status.

    On the Steps tab, you can see your Spark submit step listed. As the job progresses, the step status changes from Pending to Running to finally Completed or Failed.

  3. Choose the Applications tab to view the application UIs, in which you can access the Spark History Server and YARN Timeline Server for monitoring and troubleshooting.

Monitor CloudWatch logs

To enable CloudWatch logging and enhanced monitoring for your EMR on EC2 cluster, refer to Amazon EMR on EC2 – Enhanced Monitoring with CloudWatch using custom metrics and logs. This guide explains how to install and configure the CloudWatch agent using a bootstrap action, so you can stream system-level metrics (such as CPU, memory, and disk usage) and application logs from EMR nodes directly to CloudWatch. With this setup, you can gain real-time visibility into cluster health and performance, simplify troubleshooting, and retain critical logs even after the cluster is terminated.

For this walkthrough, check the logs in the S3 log output location.

Confirm cluster deletion

When the Spark step is complete, Step Functions will automatically delete the Amazon EMR cluster. Refresh the Clusters page on the Amazon EMR console. You should see your cluster status change from Terminating to Terminated within a minute.

By following these steps, you gain full end-to-end visibility into your workflow from the moment the Step Functions state machine is triggered to the automatic shutdown of the EMR cluster. You can monitor execution progress, troubleshoot issues, confirm job success, and continuously optimize your transient Spark workloads.

Verify job output in Amazon S3

When the job is complete, complete the following steps to check the processed results in the S3 output bucket:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Open the output S3 bucket you noted earlier.
  3. Open the processed folder.
  4. Navigate into the timestamped subfolder to view the CSV output file.
  5. Download the CSV file to view the processed results, as shown in the following screenshot.

Monitoring and troubleshooting

To monitor the progress of your Spark job running on a transient EMR on EC2 cluster, use the Step Functions console. It provides real-time visibility into each state transition in your workflow, from cluster creation and job submission to cluster deletion. This makes it straightforward to track execution flow and identify where issues might occur.During job execution, you can use the Amazon EMR console to access cluster-level monitoring. This includes YARN application statuses, step-level logs, and overall cluster health. If CloudWatch logging is enabled in your job configuration, driver and executor logs stream in near real time, so you can quickly detect and diagnose errors, resource constraints, or data skew within your Spark application.

After the workflow is complete, regardless of whether it succeeds or fails, you can perform a detailed post-execution analysis by reviewing the logs stored in the S3 bucket specified in the LogUri parameter. This log directory includes standard output and error logs, along with Spark history files, offering insights into execution behavior and performance metrics.

For continued access to the Spark UI during job execution, you can use persistent application UIs on the EMR console. These links remain accessible even after the cluster is stopped, enabling deeper root-cause analysis and performance tuning for future runs.

This visibility into both workflow orchestration and job execution can help teams optimize their Spark workloads, reduce troubleshooting time, and build confidence in their EMR automation pipelines.

Clean up

To avoid incurring ongoing charges, clean up the resources provisioned during this walkthrough:

  1. Empty the S3 buckets:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Select the input, output, and log buckets used in this tutorial.
    3. Choose Empty to remove all objects before deleting the buckets (optional).
  2. Delete the CloudFormation stack:
    1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
    2. Select the stack you created for this solution and choose Delete.
    3. Confirm the deletion to remove associated resources.

Conclusion

In this post, we showed how to build a fully automated and cost-effective Spark processing pipeline using Step Functions, EventBridge, and Amazon EMR on EC2. The workflow provisions a transient EMR cluster, runs a Spark job to process data, and stops the cluster after the job completes. This approach helps reduce costs while giving you full control over the process. This solution is ideal for scheduled data processing tasks such as ETL jobs, log analytics, or batch reporting, especially when you need detailed control over infrastructure, security, and compliance settings.

To get started, deploy the solution in your environment using the CloudFormation stack provided and adjust it to fit your data processing needs. Check out the Step Functions Developer Guide and Amazon EMR Management Guide to explore further.

Share your feedback and ideas in the comments or connect with your AWS Solutions Architect to fine-tune this pattern for your use case.


About the authors

Senthil Kamala Rathinam

Senthil Kamala Rathinam

Senthil is a Solutions Architect at Amazon Web Services, specializing in Data and Analytics for banking customers across North America. With deep expertise in Data and Analytics, AI/ML, and Generative AI, he helps organizations unlock business value through data-driven transformation. Beyond work, Senthil enjoys spending time with his family and playing badminton.

Shashi Makkapati

Shashi Makkapati

Shashi is a Senior Solutions Architect serving banking customers across North America. He specializes in data analytics, AI/ML, and generative AI, focusing on innovative solutions that transform financial organizations. Shashi is passionate about leveraging technology to solve complex business challenges in the banking sector. Outside of work, he enjoys traveling and spending quality time with his family.

Streamlining AWS Serverless workflows: From AWS Lambda orchestration to AWS Step Functions

Post Syndicated from Diego Casas original https://aws.amazon.com/blogs/compute/streamlining-aws-serverless-workflows-from-aws-lambda-orchestration-to-aws-step-functions/

This blog post discusses the AWS Lambda as orchestrator anti-pattern and how to redesign serverless solutions using AWS Step Functions with native integrations.

Step Functions is a serverless workflow service that you can use to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning (ML) pipelines. Step Functions provides native integrations with over 200 AWS services in addition to external third-party APIs. You can use these integrations to deploy production-ready solutions with less effort, reducing code complexity, improving long-term maintainability, and minimizing technical debt when operating at scale.

The Lambda as orchestrator anti-pattern

Let’s examine a common anti-pattern: using a Lambda function as an orchestrator for message distribution across multiple channels. Consider this real-world scenario where a system needs to send notifications through SMS or email channels based on user preferences, as shown in the following diagram.

The payload examples for this scenario are:

  1. Send SMS only:
    {
        "body": {
            "channel": "sms",
            "message": "Hello from AWS Lambda!",
            "phoneNumber": "+1234567890",
            "metadata": {
                "priority": "high",
                "category": "notification"
            }
        }
    }

  2. Send email only:
    {
        "body": {
            "channel": "email",
            "message": "Hello from AWS Lambda!",
            "email": {
                "to": "[email protected]",
                "subject": "Test Notification",
                "from": "[email protected]"
            },
            "metadata": {
                "priority": "normal",
                "category": "notification"
            }
        }
    }

  3. Send both SMS and email:
    {
        "body": {
            "channel": "both",
            "message": "Hello from AWS Lambda!",
            "phoneNumber": "+1234567890",
            "email": {
                "to": "[email protected]",
                "subject": "Test Notification",
                "from": "[email protected]"
            },
            "metadata": {
                "priority": "high",
                "category": "notification"
            }
        }
    }

Here’s how it typically starts—with a Lambda function acting as an orchestrator:

import boto3
import json
# Initialize Lambda client
# You can specify region if needed: boto3.client('lambda', region_name='us-east-1')
lambda_client = boto3.client('lambda')
def lambda_handler(event, context):
    try:
        # Parse the incoming event
        body = json.loads(event['body'])
        
        # Validate required fields
        if 'channel' not in body:
            return {
                'statusCode': 400,
                'body': json.dumps('Missing channel parameter')
            }
        
        if 'message' not in body:
            return {
                'statusCode': 400,
                'body': json.dumps('Missing message content')
            }
        
        if body['channel'] == 'both':
            # Invoke SMS Lambda function
            lambda_client.invoke(
                FunctionName='send-sns',
                InvocationType='Event',
                Payload=json.dumps(body)
            )
            
            # Invoke Email Lambda function
            lambda_client.invoke(
                FunctionName='send-email',
                InvocationType='Event',
                Payload=json.dumps(body)
            )
        else:
            # Validate channel value
            if body['channel'] not in ['sms', 'email']:
                return {
                    'statusCode': 400,
                    'body': json.dumps('Invalid channel specified')
                }
            
            # Invoke function based on specified channel
            function_name = 'send-sns' if body['channel'] == 'sms' else 'send-email'
            lambda_client.invoke(
                FunctionName=function_name,
                InvocationType='Event',
                Payload=json.dumps(body)
            )
        
        return {
            'statusCode': 200,
            'body': json.dumps('Messages sent successfully')
        }
        
    except json.JSONDecodeError:
        return {
            'statusCode': 400,
            'body': json.dumps('Invalid JSON in request body')
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

This approach has the following problems:

  • Complex error handling: The orchestrator needs to manage errors from multiple function invocations.
  • Tight coupling: Functions are directly dependent on each other.
  • Limited execution time: The orchestrator Lambda function continues running while sub Lambda functions execute. This could lead to the orchestrator Lambda function timing out.
  • Idle resources: Because the orchestrator Lambda function is sitting idle waiting for returns from other Lambda functions, in this case, the user is now paying for idle resources.

Rearchitecting with Step Functions

You can rebuild the logic using Step Functions and Amazon States Language to replace the Lambda orchestrator function. You can use the Choice state in Amazon States Language to define logical conditions to follow a specific path. This approach reduces code maintenance complexity because you define the conditions using Amazon States Language. You can also use it to to extend the functionality with minimal changes to the codebase.

The following Step Functions workflow diagram shows the rearchitected version of the previous Orchestrator Lambda function:

The following Amazon State Language represents the workflow:

{
  "Comment": "Multi-channel notification workflow",
  "StartAt": "ValidateInput",
  "States": {
    "ValidateInput": {
      "Type": "Choice",
      "Choices": [
        {
          "And": [
            {
              "Variable": "$.message",
              "IsPresent": true
            },
            {
              "Variable": "$.channel",
              "IsPresent": true
            }
          ],
          "Next": "DetermineChannel"
        }
      ],
      "Default": "ValidationError"
    },
    "ValidationError": {
      "Type": "Fail",
      "Error": "ValidationError",
      "Cause": "Required fields missing: message and/or channel"
    },
    "DetermineChannel": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.channel",
          "StringEquals": "both",
          "Next": "ParallelNotification"
        },
        {
          "Variable": "$.channel",
          "StringEquals": "sms",
          "Next": "SendSMSOnly"
        },
        {
          "Variable": "$.channel",
          "StringEquals": "email",
          "Next": "SendEmailOnly"
        }
      ],
      "Default": "FailState"
    },
    "ParallelNotification": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "SendSMS",
          "States": {
            "SendSMS": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "Message.$": "$.message",
                "PhoneNumber.$": "$.phoneNumber"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "SendEmail",
          "States": {
            "SendEmail": {
              "Type": "Task",
              "Parameters": {
                "FromEmailAddress.$": "$.email.from",
                "Destination": {
                  "ToAddresses.$": "States.Array($.email.to)",
                  "CcAddresses.$": "States.ArrayGetItem(States.JsonToString($.email.cc), $)",
                  "BccAddresses.$": "States.ArrayGetItem(States.JsonToString($.email.bcc), $)"
                },
                "Content": {
                  "Simple": {
                    "Subject": {
                      "Data.$": "$.email.subject",
                      "Charset": "UTF-8"
                    },
                    "Body": {
                      "Text": {
                        "Data.$": "$.message",
                        "Charset": "UTF-8"
                      },
                      "Html": {
                        "Data.$": "$.email.htmlBody",
                        "Charset": "UTF-8"
                      }
                    }
                  }
                },
                "ReplyToAddresses.$": "States.Array($.email.replyTo)",
                "EmailTags": [
                  {
                    "Name": "channel",
                    "Value": "email"
                  },
                  {
                    "Name": "messageType",
                    "Value.$": "$.email.messageType"
                  }
                ],
                "ConfigurationSetName.$": "$.email.configurationSet",
                "ListManagementOptions": {
                  "ContactListName.$": "$.email.contactList",
                  "TopicName.$": "$.email.topic"
                }
              },
              "Resource": "arn:aws:states:::aws-sdk:sesv2:sendEmail",
              "End": true
            }
          }
        }
      ],
      "End": true
    },
    "SendSMSOnly": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message.$": "$.message",
        "PhoneNumber.$": "$.phoneNumber"
      },
      "End": true
    },
    "SendEmailOnly": {
      "Type": "Task",
      "Parameters": {
        "FromEmailAddress.$": "$.email.from",
        "Destination": {
          "ToAddresses.$": "States.Array($.email.to)"
        },
        "Content": {
          "Simple": {
            "Subject": {
              "Data.$": "$.email.subject",
              "Charset": "UTF-8"
            },
            "Body": {
              "Text": {
                "Data.$": "$.message",
                "Charset": "UTF-8"
              },
              "Html": {
                "Data.$": "$.email.htmlBody",
                "Charset": "UTF-8"
              }
            }
          }
        }
      },
      "Resource": "arn:aws:states:::aws-sdk:sesv2:sendEmail",
      "End": true
    },
    "FailState": {
      "Type": "Fail",
      "Cause": "Invalid channel specified"
    }
  }
}

This Step Functions implementation offers several advantages:

  • Native service integration: Direct integration with Amazon Simple Notification Service (Amazon SNS), Amazon Simple Email Service (Amazon SES), Amazon DynamoDB, and Amazon CloudWatch eliminates the need for wrapper Lambda functions
  • Visual workflow: The execution flow is visible and maintainable through the AWS Management Console
  • Built-in error handling: Retry policies and error states can be defined declaratively
  • Parallel execution: The Parallel state handles multiple channel delivery efficiently
  • Simplified logic: The Choice state replaces complex if-else statements
  • Centralized data flow: Input and output are managed consistently across states
  • Enhanced workflow duration capabilities: Step Functions Standard workflows support executions that run for up to one year, compared to the 15-minute maximum execution time for Lambda functions

Comparing Lambda function as orchestrator to Step Functions

The summary of different features implemented on Lambda function as orchestrator and Step Functions is reflected in the following table:

Feature Lambda function as orchestrator Step Functions
Orchestration logic Implemented in Python with nested if-else statements. Defined declaratively using the Choice state
Multi-channel delivery Sequential function invocations. Parallel execution using function’s logic. Parallel execution using the Parallel state
Service integration Requires SDK calls or separate Lambda functions. Direct integration with AWS services (Amazon SNS, DynamoDB)
Error handling Custom try-except blocks in Python. Built-in error states and retry policies
Data persistance Custom code to interact with DynamoDB. Native DynamoDB integration with putItem task
Metrics logging Custom code to call CloudWatch. CloudWatch Metrics SDK integration

Implementation considerations

Review the following considerations when re-architecting a Lambda function orchestrator to Step Functions:

  • State machine type: Choose between Standard (up to 1 year runtime) and Express (up to 5 minutes) workflows based on your needs.
  • Input/output management: Parameters manipulation reduces the development effort and give flexible alternatives to implement the workflow:
    • Parameters: Selects specific input fields to pass to the next state
    • ResultSelector: Filters the state response to include only relevant fields
    • ResultPath: Stores the processed result in a specific path of the state input
    • OutputPath: Determines what data passes to the next state
      A code snippet for these features is:

      {
          "ProcessOrder": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                  "FunctionName": "ProcessOrderFunction",
                  "Payload": {
                      "orderId.$": "$.orderId",
                      "customerId.$": "$.customerId"
                  }
              },
              "ResultSelector": {
                  "orderStatus.$": "$.Payload.status",
                  "processedDate.$": "$.Payload.timestamp"
              },
              "ResultPath": "$.orderProcessing",
              "OutputPath": "$.orderProcessing",
              "Next": "NotifyCustomer"
          }
      }

  • Error handling: Implement retry policies and catch errors at both the task and state machine levels.
  • Monitoring: Set up CloudWatch logs and metrics for your state machine to track executions and performance.

Benefits of using Step Functions

Using Step Functions for rearchitecting scenarios bring the following benefits:

  • Reduced code complexity: The business logic is now defined in Amazon States Language rather than distributed across multiple Lambda functions.
  • Improved maintainability: Developers can make workflow changes by modifying the Amazon States Language, often modifying several Lambda functions.
  • Native AWS service integrations: Step Functions offers direct integrations with over 200 AWS services, which you can use to connect and coordinate AWS resources without writing custom integration code.
  • Cost optimization: By using direct service integrations, there are fewer Lambda invocations and reduced costs.
  • Long-running processes: Step Functions can manage workflows that run for up to a year, beyond the 15-minute limit for Lambda functions.

Conclusion

Rearchitecting Lambda-based applications with Step Functions can significantly improve maintainability, scalability, and operational efficiency. By moving orchestration logic into Step Functions and using its native service integrations, you can create more robust and manageable serverless applications.

While this post focused on a message distribution workflow, the principles apply to many serverless architectures. As you develop your applications, consider how Step Functions can help you build more resilient and scalable solutions.

To learn more about serverless architectures visit Serverless Land.

Orchestrating document processing with AWS AppSync Events and Amazon Bedrock

Post Syndicated from Mehdi Amrane original https://aws.amazon.com/blogs/compute/orchestrating-document-processing-with-aws-appsync-events-and-amazon-bedrock/

Many organizations implement intelligent document processing pipelines in order to extract meaningful insights from an increasing volume of unstructured content (such as insurance claims, loan applications and more). Traditionally, these pipelines require significant engineering efforts, as the implementation often involves using several machine learning (ML) models and orchestrating complex workflows.

As organizations integrate these pipelines to customer facing applications (such as web applications for customers to upload documents such as insurance claims, loan approval documents and more), they set goals to provide insights in real time to increase the end customer experience. These organizations also aim to run and scale these workloads with minimal operational overhead and optimizing on costs. In addition, these organizations require the implementation of common security practices such as identity and access management, to make sure that only authorized and authenticated users are allowed to perform specific actions or access specific resources.

In this post, we show you a solution to simplify the creation of an intelligent document processing pipeline, with a web application for customers to upload their files (documents and images) and derive insights from it (summarization, fields extraction and classification). The solution primarily use serverless technologies, it includes a web socket to receive insights in real time and offers several benefits, such as automatic scaling, built-in high availability, and a pay-per-use billing model to optimize on costs. The solution also includes an authentication layer and an authorization layer to manage identities and permissions.

Solution overview

In this post, we provide an operational overview of the solution, and then describe how to set it up with the following services:

The solution architecture is illustrated in the following diagram:

Step 1: The user authenticates to the web application (hosted in AWS Amplify).
Step 2: Amazon Cognito validates the authentication details. After this, the user is now logged in the web application.
Steps 3aand 3b:

  • Step 3a: The web application (AWS Amplify) subscribes to an AWS AppSync Events web socket.
  • Step 3b: The AWS AppSync Events web socket calls an AWS Lambda authorizer to confirm that the user is authorized to subscribe to the web socket.

Step 4: The user uploads a file (document or image) using the web application.
Step 5: The web application (hosted in AWS Amplify) calls Amazon Cognito (identity pool) to confirm that the user is authorized to upload a file.
Step 6: The file is uploaded in an Amazon S3 bucket.
Steps 7a and 7b: Upon reception of an Amazon S3 upload event (which notifies that the file was uploaded in the Amazon S3 bucket) in the default Amazon Event Bridge bus, an Amazon Event Bridge bus rule triggers the execution of an AWS Step Functions state machine to start the orchestration workflow.
Step 8 (Step to extract fields from a file and classify it):

  • Step 8a: The first AWS Lambda function starts a new Amazon Bedrock Automation job (this job extracts specific fields from the uploaded file and classify it)
  • Step 8b: Once the job is completed, the results are stored in an Amazon S3 bucket.
  • Step 8c and 8d: Upon reception of an Amazon S3 event (which notifies that the results were stored in the Amazon S3 bucket) in the default Amazon Event Bridge, an Amazon Event Bridge bus rule triggers the execution of an AWS Lambda function
  • Step 8e: An AWS Lambda function publishes the results to the web socket.

Steps 9a and 9b: The second AWS Lambda function submits a prompt to an Amazon Bedrock foundation model (Sonnet 3), to request a summarization in streaming of the uploaded file. The AWS Lambda function publishes the streaming data to the web socket.

After Step 8e and Step 9b, the user can now consult the summarization result and extraction insights of the uploaded file in the web application.

Pre-requisites

To follow along and set up this solution, you must have the following:

  • An AWS account
  • A device with access to your AWS account with the following:
    • Python 3.12 installed (including pip)
    • Node.js 20.12.0 installed
  • Enable Model Access to the Claude 3 Sonnet model in Amazon Bedrock


Note: Deploying this solution will incur costs. Review the pricing page of each AWS service used in this post for details on costs. The cost of running this solution will primarily depend on:

  • The number of documents (and the size of each document)
  • The number of active users

Setup Amazon Bedrock Data Automation

In this section, we setup an Amazon Bedrock Data Automation project and an Amazon Bedrock blueprint.

A project contains a list of blueprints, and each blueprint defines the fields to extract from different types of files (such as documents or images). In this post, we define a blueprint for a driving license.

Complete the following steps to create an Amazon Bedrock Data Automation project and a driving license blueprint:

  1. Clone the GitHub repository
    git clone https://github.com/aws-samples/sample-create-idp-with-appsyncevents-and-amazonbedrock.git

  2. Go to the sample-create-idp-with-appsyncevents-and-amazonbedrock folder
    cd sample-create-idp-with-appsyncevents-and-amazonbedrock

  3. Initialize the environment (make the shell script files, from the GitHub repository, ready to be used)
    chmod +x ./init-env.sh && source ./init-env.sh

  4. Run the script setup-bda-project.sh to create an Amazon Bedrock Data Automation project and a sample driving license blueprint:
    ./setup-bda-project.sh

Create the web socket and orchestration backend

In this section, we create the following resources:

  • A user directory for web authentication and authorization, created with an Amazon Cognito user pool. An Amazon Cognito identity pool is also created to validate that users are authorized to upload files via the web application.
  • A web socket using AWS AppSync Events. This allows our web application to receive real time updates for summarization and extraction results. An authorization layer is also created to protect the web socket from unauthorized users. This is implemented with a Lambda authorizer function to validate that incoming requests include valid authorization details.
  • A state machine using AWS Step Functions and AWS Lambda to orchestrate the summarization and extraction operations from the unstructured content
  • Amazon S3 buckets to store files for document processing, and code files for AWS Lambda functions

Complete the following steps to create the web socket and the orchestration backend of the solution, using AWS CloudFormation templates:

  1. Create Amazon S3 buckets used by the solution by running the following script. These buckets will store the files uploaded by users and code files of the AWS Lambda functions used in this solution.
    cd $CURRENT_DIR/s3; ./create-s3-buckets.sh

  2. Create the Amazon Cognito user pool and identity pool by running the create-cognito-userpool.sh script:
    cd $CURRENT_DIR/cognito; ./create-cognito-userpool.sh

  3. Create the AWS AppSync Events web socket by running the following script:
    cd $CURRENT_DIR/appsync/; ./create-appsync-api.sh

  4. Create the AWS Step Functions state machine (including AWS Lambda functions) by running the following scripts:
    cd $CURRENT_DIR/orchestration/; ./create-orchestration.sh

Configure the Amazon Cognito user pool

In this section, we create a user in our Amazon Cognito user pool. This user will log in to our web application.

Run the script create-cognito-testuser.sh to create the user (make sure to provide your email address):

cd $CURRENT_DIR/cognito; ./create-cognito-testuser.sh #your-email-address#

After you create the user, you should receive an email with a temporary password in this format: “Your username is #your-email-address# and temporary password is #temporary-password#.”

Keep note of these login details (email address and temporary password) to use later when testing the web application.

Create the web application

In this section, we build a web application using AWS Amplify and publish it to make it accessible through an endpoint URL.

Complete the following steps to create the web application:

  1. Run the script create-webapp.sh to create the web application with AWS Amplify:
    cd $CURRENT_DIR/amplify/; ./create-webapp.sh

  2. Run the script deploy.sh to deploy the web application
    cd $CURRENT_DIR/amplify/amplify-idp; ./deploy.sh

The web application is now available for testing and a URL should be displayed, as shown in the following screenshot. Take note of the URL to use in the following section.

Test the web application

In this section, we test the web application and upload a file to be processed:

  1. Open the URL of the AWS Amplify application in your web browser.
  2. Enter your login information (your email and the temporary password you received earlier while configuring the user pool in Amazon Cognito) and choose Sign in.
  3. When prompted, enter a new password and choose Change Password.
  4. You should now be able to see a web interface.
  5. Download the sample driving license at this location and upload it via the web application using either your camera or a file in your local device, as illustrated

Once the file is uploaded, you should start receiving responses in the web application. When all the operations are completed, you should see a result equivalent to what is shown in the following screenshot:

Note: If you are planning to use other driving license sample images with other formats, you may have to update the existing Bedrock Data Automation blueprint we created earlier or define a new blueprint in your Bedrock Data Automation project we created earlier for these new images to work. For more information, please review the Bedrock Data Automation documentation.

Clean up

To make sure that no additional cost is incurred, remove the resources provisioned in your account. Make sure you’re in the correct AWS account before deleting the following resources.

Important note: You should exercise caution when performing the preceding steps. Make sure you are deleting the resources in the correct AWS account.

You can either navigate to the AWS CloudFormation console to delete the CloudFormation stacks associated to the resources provisioned or use the cleanup helper script cleanup.sh available at the root of the sample-create-idp-with-appsyncevents-and-amazonbedrock folder:

./cleanup.sh #region#

Conclusion

In this post, we walked through a solution to create a document processing pipeline, with a web application using serverless services. Via the web application, we were able to upload a file and receive responses in real time for different types of operations (summarization, extraction of specific fields and classification). First, we created an Amazon Bedrock Data Automation project (with a driving license blueprint). Then we created a web socket along with an orchestration solution using a state machine (AWS Step Functions and AWS Lambda functions). We also configured a user pool to grant a user access to the web application. Finally, we created the frontend of the web application in AWS Amplify.

To dive deeper into this solution, a self-paced workshop is available in AWS Workshop Studio.

Enhance AI-assisted development with Amazon ECS, Amazon EKS and AWS Serverless MCP server

Post Syndicated from Elizabeth Fuentes original https://aws.amazon.com/blogs/aws/enhance-ai-assisted-development-with-amazon-ecs-amazon-eks-and-aws-serverless-mcp-server/

Today, we’re introducing specialized Model Context Protocol (MCP) servers for Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), and AWS Serverless, now available in the AWS Labs GitHub repository. These open source solutions extend AI development assistants capabilities with real-time, contextual responses that go beyond their pre-trained knowledge. While Large Language Models (LLM) within AI assistants rely on public documentation, MCP servers deliver current context and service-specific guidance to help you prevent common deployment errors and provide more accurate service interactions.

You can use these open source solutions to develop applications faster, using up-to-date knowledge of Amazon Web Services (AWS) capabilities and configurations during the build and deployment process. Whether you’re writing code in your integrated development environment (IDE), or debugging production issues, these MCP servers support AI code assistants with deep understanding of Amazon ECS, Amazon EKS, and AWS Serverless capabilities, accelerating the journey from code to production. They work with popular AI-enabled IDEs, including Amazon Q Developer on the command line (CLI), to help you build and deploy applications using natural language commands.

  • The Amazon ECS MCP Server containerizes and deploys applications to Amazon ECS within minutes by configuring all relevant AWS resources, including load balancers, networking, auto-scaling, monitoring, Amazon ECS task definitions, and services. Using natural language instructions, you can manage cluster operations, implement auto-scaling strategies, and use real-time troubleshooting capabilities to identify and resolve deployment issues quickly.
  • For Kubernetes environments, the Amazon EKS MCP Server provides AI assistants with up-to-date, contextual information about your specific EKS environment. It offers access to the latest EKS features, knowledge base, and cluster state information. This gives AI code assistants more accurate, tailored guidance throughout the application lifecycle, from initial setup to production deployment.
  • The AWS Serverless MCP Server enhances the serverless development experience by providing AI coding assistants with comprehensive knowledge of serverless patterns, best practices, and AWS services. Using AWS Serverless Application Model Command Line Interface (AWS SAM CLI) integration, you can handle events and deploy infrastructure while implementing proven architectural patterns. This integration streamlines function lifecycles, service integrations, and operational requirements throughout your application development process. The server also provides contextual guidance for infrastructure as code decisions, AWS Lambda specific best practices, and event schemas for AWS Lambda event source mappings.

Let’s see it in action
If this is your first time using AWS MCP servers, visit the Installation and Setup guide in the AWS Labs GitHub repository to installation instructions. Once installed, add the following MCP server configuration to your local setup:

Install Amazon Q for command line and add the configuration to ~/.aws/amazonq/mcp.json. If you’re already an Amazon Q CLI user, add only the configuration.

{
  "mcpServers": {
    "awslabs.aws-serverless-mcp":  {
      "command": "uvx",
      "timeout": 60,
      "args": ["awslabs.aws_serverless_mcp_server@latest"],
    },
    "awslabs.ecs-mcp-server": {
      "disabled": false,
      "command": "uv",
      "timeout": 60,
      "args": ["awslabs.ecs-mcp-server@latest"],
    },
    "awslabs.eks-mcp-server": {
      "disabled": false,
      "timeout": 60,
      "command": "uv",
      "args": ["awslabs.eks-mcp-server@latest"],
    }
  }
}

For this demo I’m going to use the Amazon Q CLI to create an application that understands video using 02_using_converse_api.ipynb from Amazon Nova model cookbook repository as sample code. To do this, I send the following prompt:

I want to create a backend application that automatically extracts metadata and understands the content of images and videos uploaded to an S3 bucket and stores that information in a database. I'd like to use a serverless system for processing. Could you generate everything I need, including the code and commands or steps to set up the necessary infrastructure, for it to work from start to finish? - Use 02_using_converse_api.ipynb as example code for the image and video understanding.

Amazon Q CLI identifies the necessary tools, including the MCP serverawslabs.aws-serverless-mcp-server. Through a single interaction, the AWS Serverless MCP server determines all requirements and best practices for building a robust architecture.

I ask to Amazon Q CLI that build and test the application, but encountered an error. Amazon Q CLI quickly resolved the issue using available tools. I verified success by checking the record created in the Amazon DynamoDB table and testing the application with the dog2.jpeg file.

To enhance video processing capabilities, I decided to migrate my media analysis application to a containerized architecture. I used this prompt:

I'd like you to create a simple application like the media analysis one, but instead of being serverless, it should be containerized. Please help me build it in a new CDK stack.

Amazon Q Developer begins building the application. I took advantage of this time to grab a coffee. When I returned to my desk, coffee in hand, I was pleasantly surprised to find the application ready. To ensure everything was up to current standards, I simply asked:

please review the code and all app using the awslabsecs_mcp_server tools 

Amazon Q Developer CLI gives me a summary with all the improvements and a conclusion.

I ask it to make all the necessary changes, once ready I ask Amazon Q developer CLI to deploy it in my account, all using natural language.

After a few minutes, I review that I have a complete containerized application from the S3 bucket to all the necessary networking.

I ask Amazon Q developer CLI to test the app send it the-sea.mp4 video file and received a timed out error, so Amazon Q CLI decides to use the fetch_task_logs from awslabsecs_mcp_server tool to review the logs, identify the error and then fix it.

After a new deployment, I try it again, and the application successfully processed the video file

I can see the records in my Amazon DynamoDB table.

To test the Amazon EKS MCP server, I have code for a web app in the auction-website-main folder and I want to build a web robust app, for that I asked Amazon Q CLI to help me with this prompt:

Create a web application using the existing code in the auction-website-main folder. This application will grow, so I would like to create it in a new EKS cluster

Once the Docker file is created, Amazon Q CLI identifies generate_app_manifests from awslabseks_mcp_server as a reliable tool to create a Kubernetes manifests for the application.

Then create a new EKS cluster using the manage_eks_staks tool.

Once the app is ready, the Amazon Q CLI deploys it and gives me a summary of what it created.

I can see the cluster status in the console.

After a few minutes and resolving a couple of issues using the search_eks_troubleshoot_guide tool the application is ready to use.

Now I have a Kitties marketplace web app, deployed on Amazon EKS using only natural language commands through Amazon Q CLI.

Get started today
Visit the AWS Labs GitHub repository to start using these AWS MCP servers and enhance your AI-powered developmen there. The repository includes implementation guides, example configurations, and additional specialized servers to run AWS Lambda function, which transforms your existing AWS Lambda functions into AI-accessible tools without code modifications, and Amazon Bedrock Knowledge Bases Retrieval MCP server, which provides seamless access to your Amazon Bedrock knowledge bases. Other AWS specialized servers in the repository include documentation, example configurations, and implementation guides to begin building applications with greater speed and reliability.

To learn more about MCP Servers for AWS Serverless and Containers and how they can transform your AI-assisted application development, visit the Introducing AWS Serverless MCP Server: AI-powered development for modern applications, Automating AI-assisted container deployments with the Amazon ECS MCP Server, and Accelerating application development with the Amazon EKS MCP server deep-dive blogs.

— Eli

Integrating aggregators and Quick Service Restaurants with AWS serverless architectures

Post Syndicated from Mike Gomez original https://aws.amazon.com/blogs/compute/integrating-aggregators-and-quick-service-restaurants-with-aws-serverless-architectures/

In this post, you learn how to use AWS serverless technologies, such as Amazon EventBridge and AWS Lambda, to build an integration between Quick Service Restaurants (QSRs) and online ordering and food delivery aggregators. These aggregators have taken off as an option to QSRs to expand their consumer base, enabling them with delivery options to help grow their businesses.

QSR overview

QSRs prioritize speedy and convenient service, offering a streamlined menu. To meet evolving consumer expectations, QSRs can use API integrations with third-party aggregators. This technological synergy enables QSRs to expand their capabilities, introducing diverse payment methods and incorporating delivery services. These features have become standard in this restaurant segment.

Behind the scenes, the APIs are used to orchestrate the interaction between the aggregator and the QSR while having a consistent ordering and delivery experience.

QSR business objectives are:

  • Providing consistent ordering and delivery experiences
  • Offering personalized menu items
  • Retaining repeat customers
  • Reducing third-party delivery cancellation due to lack of delivery personalization options

This post starts with a simple architecture and adds components to solve architectural challenges.

Architecture

As a solutions architect, you’ve been approached by a thriving local restaurant business seeking technological solutions to fuel their expansion. Your task is to design an optimal integration architecture that aligns with their technical requirements, streamlines operations, and enhances customer experience.

At the core of this integration is Amazon API Gateway, which accepts the incoming orders from various delivery aggregators. The API Gateway becomes the front door, connecting the QSRs with the end customers for a streamlined and dynamic order processing system.

Driving the backend of this integration are Lambda functions. These functions validate orders and securely communicate with delivery aggregators. Lambda functions can scale dynamically based on-demand, and make sure of optimal resource usage and cost-effectiveness.

Order placement workflow

The following steps outline the serverless integration between API Gateway and Lambda functions, as shown in the following figure:

  • Customers can place orders either through food delivery aggregators or the business’s own ordering system.
  • The order request is sent to API Gateway.

This architecture works for small and simple integrations. To scale this architecture for high traffic, use asynchronous integration to reduce the coupling between API and Lambda function.

Order routing workflow

The following steps outline a serverless integration where API Gateway connects to Lambda functions through Amazon EventBridge as the event routing service, as shown in the following figure:

  1. API Gateway receives the order request.
  2. The API Gateway routes the customer’s order request to an EventBridge bus for processing.

EventBridge routes events (for example order status changes) to Lambda functions, making sure of resiliency during service disruptions. This eliminates manual error handling and keeps QSRs and aggregators synchronized.

EventBridge delivers the following essential capabilities:

  • EventBridge receives events triggered by various actions, such as new orders or menu updates.
  • It routes events to the relevant Lambda functions, initiating the appropriate actions.
  • EventBridge supports event replay, allowing recovery from Lambda deployment issues or function failures. This feature enables business continuity by storing events during service disruptions and automatically resuming processing when the system stabilizes.

To maintain order history and enable fast data retrieval, the system needs a highly performant database. Amazon DynamoDB, a serverless NoSQL database service, meets these requirements by efficiently storing and managing order information and metadata. The order processing Lambda function interacts with DynamoDB to persist order details. This approach enables asynchronous processing of the stored data by other backend processes. The database solution provides the scalability and responsiveness needed to handle growing order volumes while maintaining consistent performance, separating order intake from subsequent processing steps.

Order processing workflow

The following steps outline the order processing workflow, as shown in the following figure:

  • The order processing Lambda function validates the order and updates the DynamoDB database with the new order details.
  • The function publishes error events to EventBridge, enabling downstream processing for error handling and retry logic. These events can trigger more Lambda functions designed to manage specific error scenarios and recovery processes.

EventBridge implementation patterns: single or dual bus approaches

EventBridge offers multiple approaches for event bus topology. Architects can choose to either use a single event bus with distinct event patterns based on order status or implement a multi-bus strategy.

The single-bus approach uses one event bus for all events with routing rule patterns based on order status. For example, rules would match specific statuses (for example “new” or “processed”) to trigger appropriate Lambda functions. Although it is architecturally simple, it needs careful management of the event schema to avoid potential errors. However, a single-bus approach requires careful handling to prevent recursive processing, where messages trigger additional messages in an endless loop.

Alternatively, the multi-bus method, separating order placement and processing across different buses, effectively prevents loops and recursion issues. This approach provides better separation of transactions, albeit with a slightly more complex setup.

EventBridge can directly target external services using the API destination option, eliminating the need for Lambda functions for third party integrations.

Orchestrating order processing

In complex order processing systems for QSRs, managing multiple interdependent Lambda functions can become challenging, potentially leading to intricate code and difficult-to-maintain architectures. To address this, AWS Step Functions can be introduced as an orchestration layer.

Step Functions acts as a central coordinator for the business logic needed in QSR order flows. This service manages the progression of activities in the order processing workflow, thereby efficiently coordinating tasks such as kitchen preparation and delivery logistics. Defining and managing complex workflows allows Step Functions to optimize the overall efficiency of QSR operations, providing a structured and adaptable solution. This orchestration enhances the restaurant’s ability to handle dynamic processing, achieving a smooth and responsive integration with delivery services while streamlining the underlying architecture.

The following steps outline the orchestration of order processing, as shown in the following figure:

  • Order processing trigger respective Lambda function, which updates the order data in the DynamoDB database.
  • The updated order is made available for subsequent Lambda functions that process more business logic being performed by further Lambda functions.

In a multi-bus EventBridge architecture, the process flows are as follows:

  1. The first EventBridge bus receives the initial order event and routes it to a Step Functions workflow.
  2. The Step Functions workflow orchestrates the order processing, coordinating various tasks and checks.
  3. Upon completion, the Step Functions workflow emits an event with the processing results to the second EventBridge bus.
  4. Based on the output from the Step Function workflow, this second bus contains a rule that triggers the Aggregator API as an API destination.

User engagement workflow

When a customer places an order, there must be a way to confirm or notify them when the order is ready. For this purpose, you can use AWS End User Messaging services to push notifications for order completion and new offers to customers.

Analyzing customer data and individual preferences allows Amazon Personalize to be used to present personalized recommendations and promotions.

Amazon Personalize can analyze historical order data to enhance the user experience through personalized recommendations, such as optimal delivery times, preferred menu items, and tailored promotions based on individual ordering patterns.

Conclusion

This post showed how to use AWS serverless services to build a platform for your order processing without worrying about managing underlying infrastructure. The serverless services included were Amazon API Gateway, AWS Lambda, Amazon EventBridge, AWS Step Functions, AWS End User Messaging, and Amazon Personalize.

This post is a brief introduction to event-driven architectures focused on integrations of internal ordering systems with delivery aggregators and third-party ordering platforms. This can help expand the user base, and it has been a key factor in the growth of many QSRs. Making the ordering, take-out, and delivery experience more efficient translates to revenue growth, reduction of order abandonment, as well as increased recurrent customer retention and brand loyalty.

For more serverless learning resources, visit Serverless Land. To find more patterns, go directly to the Serverless Patterns Collection.

How to use AWS Transfer Family and GuardDuty for malware protection

Post Syndicated from James Abbott original https://aws.amazon.com/blogs/security/how-to-use-aws-transfer-family-and-guardduty-for-malware-protection/

Organizations often need to securely share files with external parties over the internet. Allowing public access to a file transfer server exposes the organization to potential threats, such as malware-infected files uploaded by threat actors or inadvertently by genuine users. To mitigate this risk, companies can take steps to help make sure that files received through public channels are scanned for malware before processing.

This post demonstrates how to use AWS Transfer Family and Amazon GuardDuty to scan files uploaded through a secure FTP (SFTP) server for malware as part of an overall transfer workflow. For readers who might have read an earlier blog post on this topic, the key difference is that this solution is fully managed and doesn’t require the deployment of compute resources. GuardDuty automatically updates malware signatures every 15 minutes instead of using a container image for scanning, avoiding the need for manual patching to keep the signatures up to date.

Prerequisites

To deploy the solution in this post, you will need:

  • An AWS account: You need access to AWS to deploy this solution. If you don’t have an account that you can use, see Start building on AWS today.
  • AWS CLI: Install and configure the AWS Command Line Interface (AWS CLI) to be authenticated to your AWS account. Set up the environment variables for your AWS account using the access token and secret access key for your environment.
  • Git: You will use Git to pull down the example code from GitHub.
  • Terraform: You’ll use Terraform to run the automation. Follow the Terraform installation instructions to download and set up Terraform.

Solution overview

This solution uses Transfer Family and GuardDuty. Transfer Family provides a secure file transfer service that you can use to set up an SFTP server, and GuardDuty is an intelligent threat detection service. GuardDuty monitors for malicious activity and anomalous behavior to protect AWS accounts, workloads, and data. At a high level, the solution uses the following steps:

  • A user uploads a file through a Transfer Family SFTP server.
  • A Transfer Family managed workflow invokes AWS Lambda to execute an AWS Step Functions workflow.
    • The workflow begins only after a successful file upload.
    • Partial uploads to the SFTP server will invoke an error handling Lambda function to report a partial upload error.
  • A step function state machine invokes a Lambda function to move uploaded files to an Amazon Simple Storage Service (Amazon S3) bucket for processing and then starts scanning using GuardDuty.
  • The GuardDuty scan result is sent as a callback to the step function.
  • Infected files are moved or cleaned.
  • The workflow sends the user the results through an Amazon Simple Notification Service (Amazon SNS) topic. This can be a notification of an error or malicious upload during the scan or notification of a successful upload and a clean scan for further processing.

Solution architecture and walkthrough

The solution uses GuardDuty Malware Protection for S3 to scan newly uploaded objects to the S3 bucket. You can use this feature of GuardDuty to set up a malware protection plan for an S3 bucket at the bucket level or to watch for specific object prefixes.

Figure 1: Solution architecture

Figure 1: Solution architecture

The following steps (shown in Figure 1) describe the workflow for this solution starting from the point the file is uploaded until it’s scanned and marked as safe or as infected, leading to subsequent steps that can be customized based on your use case.

  1. A file is uploaded using the SFTP protocol through Transfer Family.
  2. If the file is successfully uploaded, Transfer Family uploads the file to the S3 bucket called Unscanned and the Managed Workflow Complete workflow is triggered. This is the workflow used to handle successful uploads and invokes the Step Function Invoker Lambda function.
  3. The Step Function Invoker starts the state machine and kicks off the first step in the process by invoking the GuardDuty – Scan Lambda function.
  4. The GuardDuty – Scan function moves the file to the Processing bucket. This is the bucket from which the files will be scanned.
  5. When an object upload activity is detected, GuardDuty automatically scans the object. In this implementation, a malware protection plan is created for the Processing bucket.
  6. When a scan completes, GuardDuty publishes the scan result to Amazon EventBridge.
  7. An EventBridge rule has been created to invoke a Lambda Callback function whenever a scan event has completed. EventBridge will invoke the function with an event that contains the scan results. See Monitoring S3 object scans with Amazon EventBridge for an example.
  8. The Lambda Callback function notifies the GuardDuty – Scan task using the callback task integration pattern. The results of the GuardDuty scan are returned to the GuardDuty – Scan function and these results are passed to the Move File task.
  9. If the result is a clean scan with no threats detected, the Move File task will place the file in the Clean S3 bucket, indicating that the file is successfully scanned and safe for further processing.
  10. At this point, the Move File function publishes a notification to the Success SNS topic to notify the subscribers.
  11. If the result indicates that the file is malicious, the Move File function will instead move the file to the Quarantine S3 bucket for further investigation. The function will also delete the file from the Processing bucket and publish a notification in the Error topic in SNS to notify the user of a potential malicious file being uploaded.
  12. If the file upload is unsuccessful and the file isn’t fully uploaded, then Transfer Family will trigger the Managed Workflow Partial workflow.
  13. Managed Workflow Partial is an error handling workflow and invokes the Error Publisher function, which is used for reporting errors that occur anywhere in the workflow.
  14. The Error Publisher function identifies the type of error—whether it’s because of the partial upload or an issue elsewhere in the workflow—and sets the error status accordingly. It will then publish an error message to Error Topic in SNS.
  15. The GuardDuty – Scan task has a timeout to make sure that an event is published to Error Topic to prompt a manual intervention to investigate further if the file isn’t successfully scanned. If the GuardDuty – Scan task fails, the Error clean up Lambda function is invoked.

Finally, there’s an S3 Lifecycle policy attached to the Processing bucket. This is to make sure that no file is left in the Processing bucket for more than one day.

Code repository

The GitHub AWS-samples repository has a sample implementation developed using Terraform and Python-based Lambda functions to implement this solution. The same solution can also be implemented using AWS CloudFormation. The code has the components needed to deploy the entire workflow to demonstrate the abilities of Transfer Family and the GuardDuty malware protection plan.

Install the solution

Use the following steps to deploy this solution to your test environment.

  1. Clone the repository to your working directory using Git.
  2. Navigate to the root directory of your cloned project directory.
  3. Update the terraform locals.tf file with the values of your choice for the S3 bucket names, SFTP server names, and other variables.
  4. Run terraform plan.
  5. If everything looks good, run a terraform apply and enter yes to create the resources.

Clean up

After testing and exploring the solution, it’s important to clean up the resources you created to avoid incurring unnecessary costs. To delete the resources created by this solution, navigate to the root directory of your cloned project and run the following command:

terraform destroy

This command will remove the resources created by Terraform, including the SFTP server, S3 buckets, Lambda functions, and other components. Confirm the deletion by entering yes when prompted.

Conclusion

By using the approach outlined in the post, you can make sure that the files received over SFTP and uploaded to your S3 bucket are scanned for threats and are safe for further processing. The solution reduces the exposure surface by making sure that public uploads are scanned in a safe environment before they’re sent to other components of your system.

If you have feedback about this post, submit comments in the Comments section below.

James Abbott

James Abbott

James is a Principal Solutions Architect at AWS, working in Global Financial Services. When not in the office he enjoys mountain biking in North Carolina.

Santhosh Srinivasan

Santhosh Srinivasan

Santhosh is a Sr. Cloud Application Architect with the Professional Services team at AWS. He specializes in building and modernizing large scale enterprise applications in the cloud with a focus on the financial services industry.

Suhas Pasricha

Suhas Pasricha

Suhas is a Cloud Infrastructure Architect in the AWS Professional Services team. He has a background in web development and infrastructure automation. At Amazon, he has been helping customers set up and operate an enterprise-wide landing zone and cloud environment. In his spare time, he likes to read and play video games.

AWS Weekly Review: Amazon S3 Express One Zone price cuts, Pixtral Large on Amazon Bedrock, Amazon Nova Sonic, and more (April 14, 2025)

Post Syndicated from Elizabeth Fuentes original https://aws.amazon.com/blogs/aws/aws-weekly-review-amazon-s3-express-one-zone-price-cuts-pixtral-large-on-amazon-bedrock-amazon-nova-sonic-and-more-april-14-2025/

The Amazon Web Services (AWS) Summit 2025 season launched this week, starting with the Paris Summit. These free events bring together the global cloud computing community for learning and collaboration. AWS Community Day Romania, held on April 11th, showcased how the local community creates opportunities for collective growth and inclusion.

Last week’s launches
Announcing up to 85% price reductions for Amazon S3 Express One Zone S3 Express One Zone, a high-performance storage class, now has reduced storage prices by 31 percent, PUT request prices by 55 percent, and GET request prices by 85 percent. In addition, S3 Express One Zone has reduced the per-GB charges for data uploads and retrievals by 60 percent. These charges now apply to all bytes transferred rather than just portions of requests greater than 512 KB.

Here is a price reduction table in the US East (N. Virginia) AWS Region:

Price Previous New Price reduction
Storage
(per GB-Month)
$0.16 $0.11 31%
Writes
(PUT requests)
$0.0025 per 1,000 requests up to 512 KB $0.00113 per 1,000 requests 55%
Reads
(GET requests)
$0.0002 per 1,000 requests up to 512 KB $0.00003 per 1,000 requests 85%
Data upload
(per GB)
$0.008 $0.0032 60%
Data retrievals
(per GB)
$0.0015 $0.0006 60%

AWS announces Pixtral Large 25.02 model in Amazon Bedrock serverless The Pixtral Large 25.02, developed by Mistral AI, combines advanced vision and language understanding, boasting a 128K context window and multilingual capabilities. This agent-centric design simplifies integration with existing systems. Prompt adherence improves reliability when working with Retrieval Augmented Generation (RAG) applications and large context scenarios.

Introducing Amazon Nova Sonic: Human-like voice conversations for generative AI applications Amazon Nova Sonic, the newest addition to the Amazon Nova family of foundation models (FMs) is available in Amazon Bedrock to create human-like voice conversations for applications. It unifies speech and text processing into one model, reducing complexity and enhancing natural interactions. Start today with the Amazon Nova model cookbook repository.

Amazon Bedrock Guardrails enhances generative AI application safety with new capabilitiesAmazon Bedrock Guardrails introduces new capabilities to enhance generative AI application safety, including multimodal toxicity detection, enhanced Personally Identifiable Information (PII) protection, AWS Identity and Access Management (AWS IAM) policy enforcement, selective guardrail application, and monitor mode for pre-deployment analysis.

AWS App Studio introduces a prebuilt solutions catalog and cross-instance Import and Export — This is a prebuilt solutions catalog with ready-to-use applications and patterns and cross-instance Import and Export functionality. These features help you streamline development applications, reducing setup time to under 15 minutes. Learn more about this in AWS App Studio introduces a prebuilt solutions catalog and cross-instance Import and Export blog.

Amazon Nova Reel 1.1: Featuring up to 2-minutes multi-shot videos Amazon Nova Reel 1.1 enhances video generation through Amazon Bedrock with support for 2-minute multi-shot videos. You can now create content using either single prompts for automatic generation or custom prompts for individual shots, offering flexible options for marketing and social media content creation.

AWS IAM Identity Center now offers improved error messages and AWS CloudTrail logging for provisioning issues AWS Identity and Access Management (IAM) Identity Center has enhanced its service with improved error messages and AWS CloudTrail logging capabilities. These updates help users better troubleshoot synchronization issues when managing workforce identities across AWS accounts and applications, while enabling automated monitoring and auditing of provisioning problems.

AWS WAF Console adds new top insights visualizations in additional regionsAWS WAF Console now offers enhanced traffic visualization features in AWS GovCloud (US) Regions. The all traffic dashboard includes new top insights based on Amazon CloudWatch logs, helping customers analyze traffic patterns, identify security threats, and optimize WAF configurations through detailed metrics.

AWS Step Functions expands data source and output options for Distributed MapAWS Step Functions enhances Distributed Map with expanded data source support, including JSONL and various delimited file formats from Amazon Simple Storage Service (Amazon S3). The update also adds new output transformation options, enabling more flexible parallel processing workflows and better integration with downstream systems.

Amazon CloudWatch now provides lock contention diagnostics for Aurora PostgreSQL Amazon CloudWatch Database Insights introduces lock contention diagnostics for Amazon Aurora PostgreSQL in Advanced mode. The feature visualizes blocking and waiting sessions, helping users identify root causes of lock contention issues, with 15-month historical data retention for comprehensive troubleshooting.

Get updated with all the announcements of AWS announcements on the What’s New with AWS? page.

Other AWS blog posts
Reduce ML training costs with Amazon SageMaker HyperPodAmazon SageMaker HyperPod addresses hardware failures in large-scale Machine Learning (ML) model training by automatically detecting and replacing faulty instances. The solution reduces downtime from 280 to 40 minutes per failure, potentially saving 32% of training time for large clusters. For a 10-million GPU-hour training job, this translates to $25.6M in cost savings.

Model customization, RAG, or both: A case study with Amazon Nova — A study comparing model customization with fine-tuning and Retrieval Augmented Generation (RAG) approaches with Amazon Nova models. Key findings show combining both methods yields best results: RAG works well for dynamic data and domain insights, while fine-tuning excels in specialized tasks and latency reduction.

Generate user-personalized communication with Amazon Personalize and Amazon BedrockAmazon Personalize and Amazon Bedrock work together to create personalized marketing emails. Learn how to create personalized user communications by combining Amazon Personalize for movie recommendations with Amazon Bedrock for generating tailored email content based on user preferences and demographics.

Implement human-in-the-loop confirmation with Amazon Bedrock Agents — When implementing human validation in Amazon Bedrock Agents, developers have two primary frameworks at their disposal: user confirmation and return of control (ROC). Using an HR application example, user confirmation allows simple yes/no validation before executing actions, while ROC enables users to modify parameters before execution.

Multi-LLM routing strategies for generative AI applications on AWS — Learn how to implement multi-Large Language Model (LLM) routing strategies for AWS generative AI applications using static routing, dynamic routing with Amazon Bedrock, or custom solutions for optimal model selection and cost efficiency.

Here are my personal favorites posts from community.aws:

Building a RAG System for Video Content Search and Analysis — In this blog, I’ll show you how to build a RAG system that makes video content searchable and analyzable. Unlocking video content has never been more crucial in today’s digital landscape. Whether you’re managing educational materials, corporate training, or entertainment content, the ability to search and analyze video content efficiently can transform how we interact with multimedia resources.

Build Serverless GenAI Apps Faster with Amazon Q Developer CLI AgentAmazon Q Developer CLI Agent enables rapid serverless GenAI app development. With one prompt, it generates infrastructure code, Lambda functions, and integrates with Claude 3 Haiku on Amazon Bedrock.

Speech-to-Speech AI: From Dr. Sbaitso to Amazon Nova Sonic — The evolution of speech-to-speech AI, from Dr. Sbaitso (1990s) to Amazon Nova Sonic. New AWS service enables real-time bidirectional conversations through Amazon Bedrock for more natural applications.

Setup Model Context Protocol (MCP) using Amazon Bedrock — A guide to setting up Model Context Protocol (MCP) desktop client with Amazon Bedrock models, enabling seamless integration between AI applications and external tools using Goose client.

Upcoming AWS events
Check your calendars and sign up for these upcoming AWS events:

AWS GenAI LoftsGenAI Lofts available around the world, offer collaborative spaces and immersive experiences for startups and developers. You can join in-person GenAI Loft San Francisco events such as GenAI in EdTech: A Hands-On Workshop (April 15), and Unstructured Data Meetup SF (April 16). Find your nearest event at GenAI Lofts.

AWS Summits — Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Amsterdam (April 16), London (April 30), and Poland (May 5).

AWS re:Inforce — AWS re:Inforce (June 16–18) in Philadelphia, PA, is our annual learning event devoted to all things AWS cloud security. Registration is open. Be ready to join more than 5,000 security builders and leaders.

AWS Community Days — Join community-led conferences featuring technical discussions, workshops, and hands-on labs driven by expert AWS users and industry leaders from around the world. Upcoming AWS Community Days are scheduled for April 19 in Turkey, and on April 29 in Prague with Jeff Barr as Opening Keynote Speaker.

You can browse all upcoming in-person and virtual events.

Create your AWS Builder ID and reserve your alias. Builder ID is a universal login credential that gives you access—beyond the AWS Management Console—to AWS tools and resources, including over 600 free training courses, community features, and developer tools such as Amazon Q Developer.

That’s all for this week. Stay tuned for next week’s Weekly Roundup!

Eli

Thanks to Andra Somesan for the AWS Community Romania photo and Thembile Martis for the AWS Paris Summit photo.

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!


How is the News Blog doing? Take this 1 minute survey!

(This survey is hosted by an external company. AWS handles your information as described in the AWS Privacy Notice. AWS will own the data gathered via this survey and will not share the information collected with survey respondents.)

Serverless ICYMI 2025 Q1

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/serverless-icymi-2025-q1/

Welcome to the 28th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. At the end of a quarter, we share the most recent product launches, feature enhancements, blog posts, videos, live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, check out what happened in Q4 2024 here.

Serverless calendar Q1 2025

Serverless calendar Q1 2025

AWS Step Functions

The AWS Step Functions team continues to improve developer experience. Workflow Studio is now available within Visual Studio Code (VS Code) through the AWS Toolkit extension.

AWS Step Functions in IDE

AWS Step Functions in IDE

You can now design, test, and deploy your Step Functions workflows without leaving your IDE. The extension provides a drag-and-drop interface with all the familiar Workflow Studio capabilities, making it even easier to build state machines locally.

To get started, install the AWS Toolkit for Visual Studio Code and visit the user guide on Workflow Studio integration.

Step Functions private integrations now allows you to integrate applications seamlessly across private networks, on-premises infrastructure, and cloud platforms. Learn more in a blog post and explanation video.

AWS Step Functions private integrations video

AWS Step Functions private integrations video

Step Functions now integrates with 36 more AWS services that support user messaging capabilities. You can orchestrate notifications through Amazon SNS, Amazon SQS, Amazon EventBridge, Amazon Pinpoint, and more, all using the optimized integrations you’re familiar with.

Step Functions has increased the default quota for state machines and activities from 10,000 to 100,000 per AWS account. This tenfold increase means you can create more workflows to automate your business processes without worrying about hitting quota limits.

Distributed Map is expanding capabilities by adding support for JSON Lines (JSONL) format. JSONL, a highly efficient text-based format, stores structured data as individual JSON objects separated by newlines, making it particularly suitable for processing large datasets.

AWS Step Functions Distributed Map

AWS Step Functions Distributed Map

Distributed Map can also process data from a broader range of delimited file formats stored in Amazon S3 and offers new output transformations for greater control over result formatting.

Developer Tools

Serverless Land patterns are now available directly within VS Code.

You no longer need to switch between your IDE and external resources when building serverless architectures. Browse, search, and implement pre-built serverless patterns directly in VS Code.

Example Serverless Pattern

Example Serverless Pattern

AWS Lambda

Learn how AWS Lambda handles billions of invocations.

AWS Lambda asynchronous invocations

AWS Lambda asynchronous invocations

This blog post provides recommendations and insights for implementing highly distributed applications based on the Lambda service team’s experience building its robust asynchronous event processing system. It dives into challenges you might face, solution techniques, and best practices for handling noisy neighbors.

A new video walks through using the enhanced local IDE experience for Lambda developers.

AWS Lambda new IDE experience

AWS Lambda new IDE experience

The VS Code extension for Lambda now supports live tailing of CloudWatch Logs directly in your IDE following on from previous support for Live Tail in the Lambda console. Watch logs in real-time as your functions execute, making debugging and troubleshooting more efficient than ever.

You can now enable Application Performance Monitoring (APM) for Java and .NET runtimes using Amazon CloudWatch Application Signals.

Amazon CloudWatch Application Signals for Java and .NET AWS Lambda runtimes

Amazon CloudWatch Application Signals for Java and .NET AWS Lambda runtimes

This provides deep visibility into your function’s performance, including method-level tracing, memory profiling, and automated anomaly detection.

Amazon Bedrock features

Multi-agent collaboration is now available in Bedrock as a preview, enabling you to create systems where multiple AI agents work together to solve complex problems. Agents can specialize in different domains, share context, and coordinate their actions to achieve goals that would be difficult for a single agent.

RAG evaluation is now generally available. This provides metrics to assess and improve your retrieval augmented generation pipelines. GraphRAG for Bedrock Knowledge Bases is now generally available, allowing you to enhance retrievals with graph-based context.

Amazon Bedrock Flows now supports multi-turn conversations, allowing you to build dynamic AI applications that maintain context across multiple user interactions. Bedrock data automation is now generally available, streamlining the process of preparing, ingesting, and maintaining data for your GenAI applications. Bedrock now offers LLM-as-a-judge capability for model evaluation, providing automated assessment of model outputs without requiring human reviewers. Compare different models or prompt strategies against your specific criteria at scale.

Bedrock’s capabilities are now integrated into the Amazon SageMaker Unified Studio, creating a seamless experience for machine learning practitioners who want to incorporate foundation models into their workflows. Access Bedrock models, fine-tuning, and evaluation directly from SageMaker.

Amazon Nova is a new generation of state-of-the-art foundation models that deliver frontier intelligence and industry leading price-performance. Nova has expanded its tool use and converse API capabilities, making it easier for developers to build AI assistants that can use external tools to complete tasks.

Amazon Bedrock Guardrails image content filters are now generally available. Define and enforce boundaries for your AI applications with controls for both text and image content, ensuring outputs align with your organization’s policies.

Bedrock Knowledge Bases now supports using your existing OpenSearch clusters as the vector storage backend. This integration allows you to leverage your investments in OpenSearch while benefiting from the managed RAG capabilities of Bedrock.

New Amazon Bedrock models

  • Anthropic’s Claude 3.7 Sonnet hybrid reasoning allows you to toggle between standard and extended thinking modes. In standard mode, it functions as an upgraded version of Claude 3.5 Sonnet. While in extended thinking mode, it employs self-reflection to achieve improved results across a wide range of tasks.
  • DeepSeek R1, an advanced model specialized in research and scientific reasoning excels at complex problem-solving tasks and technical content generation.
  • Cohere Embed 3 models are now available in both multilingual and English-specific versions. These embedding models support text and images, providing more accurate representation for multimodal content and improving retrieval augmented generation (RAG) applications.
  • Ray2, Luma AI’s new visual AI model is capable of creating realistic visuals with fluid, natural movement. You can use it for image understanding, 3D scene reconstruction, and visual content generation, opening new possibilities for immersive and visual applications.
  • Bedrock now supports fine-tuning of Meta’s latest Llama 3.2 models. These upgraded models deliver improved performance across reasoning, coding, and multilingual tasks while being more efficient with computational resources.

Amazon Q Developer

Amazon Q Developer is now available as a CLI agent, bringing AI-assisted development to the command line. Get contextual recommendations, generate shell commands, and solve coding problems without leaving your terminal.

Amazon Q CLI

Amazon Q CLI

Amazon Q Developer transformation now supports upgrading Java applications using Maven to Java 21. It offers enhanced code suggestions, refactoring, and optimization recommendations for applications using the latest Java features, like virtual threads and pattern matching.

AWS AppSync

AWS AppSync Events now supports events publishing for WebSocket APIs, enabling real-time publish-subscribe functionality. This feature makes it easier to build applications requiring instant updates, like chat applications, collaborative tools, and real-time dashboards.

AWS AppSync Events

AWS AppSync Events

There are new AWS Cloud Development Kit (AWS CDK) L2 constructs for AppSync WebSocket APIs. These make it simpler to define and deploy real-time APIs using infrastructure as code. These high-level constructs handle the details of WebSocket connections, authorization, and messaging patterns.

Amazon SNS

Amazon SNS now supports high throughput mode for SNS FIFO topics, with default throughput matching SNS standard topics. When you enable high-throughput mode, SNS FIFO topics will maintain order within message group, while reducing the de-duplication scope to the message-group level.

Amazon EventBridge

Amazon EventBridge now supports direct delivery to targets across AWS accounts, simplifying multi-account architectures. This reduces latency and improves reliability when routing events between accounts in your organization.

Amazon EventBridge cross account

Amazon EventBridge cross account

The EventBridge console now features event source discovery, making it easier to find and visualize available event sources in your AWS environment. This tool helps you identify potential event producers and understand the event schemas they emit.

AWS Amplify

AWS Amplify now offers a TypeScript data client optimized for server-side Lambda functions, providing type-safe access to your data sources. This client reduces code complexity and improves reliability when working with databases and APIs in server environments.

Serverless compute blog posts

January

February

March

Serverless Office Hours weekly livestream

February

March

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Developer Advocacy team members who work on Serverless to see the latest news, follow conversations, and interact with the team.

And finally, visit the Serverless Land  for all your serverless needs.

Simplifying private API integrations with Amazon EventBridge and AWS Step Functions

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/simplifying-private-api-integrations-with-amazon-eventbridge-and-aws-step-functions-2/

This blog written by Pawan Puthran, Principal Specialist TAM, Serverless and Vamsi Vikash Ankam, Senior Serverless Solutions Architect.

In December 2024, AWS announced that Amazon EventBridge and AWS Step Functions support integration with private APIs using AWS PrivateLink and Amazon VPC Lattice. This feature allows users to integrate applications seamlessly across private networks, on-premises infrastructure, and cloud platforms. It provides operational simplicity, enabling secure and controlled communication between services within a Virtual Private Cloud (VPC). This blog post explores how to leverage this new capability to integrate Step Functions with private APIs, making application interactions across private networks more efficient and secure.

Overview

Private integrations are essential for secure communication between cloud services within a VPC. As organizations modernize their applications in the cloud, they often need to integrate existing systems with private network environments. EventBridge and Step Functions previously needed proxies to send events to HTTPS applications. These proxies, such as AWS Lambda or Amazon Simple Queue Service (Amazon SQS), delivered events to applications running on Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Kubernetes Service (Amazon EKS), or Amazon Elastic Container Service (Amazon ECS). Now, users can directly invoke private HTTPS-based endpoints running within their VPC using EventBridge and Step Functions.

This new capability offers several key benefits:

  1. Enhanced security and compliance: Private API integrations significantly enhance security by keeping APIs within private networks, minimizing exposure to internet threats and making sure of compliance in regulated industries such as finance and healthcare.
  2. Simplified architecture and increased developer productivity: This feature streamlines integration by enabling direct access to private APIs, eliminating complex network setups and proxy solutions. It allows developers to focus on core logic, resulting in cleaner architectures, faster development, and reduced maintenance. By removing the need for custom code and unifying application architecture, the integration process accelerates, leading to faster time to market and enhanced innovation.
  3. Improved performance and reliability: Private API integrations to VPC resources enhance performance by leveraging the AWS backbone network. This direct connectivity improves speed, increases reliability, and minimizes external network dependencies and points of failure.

EventBridge and Step Functions use new capabilities of PrivateLink and VPC Lattice, Resource Gateway and Resource Configuration, to facilitate secure network connectivity to services and resources inside of a VPC. To establish the private connectivity, you need the following components:

  1. Resource Gateway: A Resource Gateway serves as a secure entry point for the inbound traffic to the resource. This acts as an ingress point within the VPC where the resources reside.
  2. Resource Configuration: A Resource Configuration is a logical entity that identifies the resource and specifies how and who can access it. Defining a resource configuration allows you to allow private, secure, and unidirectional network connectivity to resources in your VPC from clients and services in other VPCs and accounts.
  3. EventBridge Connections: EventBridge Connections used in EventBridge API destinations and Step Function workflows, establishes connectivity to your private HTTPS endpoints by using resource configurations.
  4. AWS Resource Access Manager: You can share the resource configuration through AWS Resource Access Manager (AWS RAM), a service that securely shares your VPC resources across your organizations and with other AWS accounts.

Workload overview

To illustrate how Step Functions invoke private HTTPS APIs, consider the following workflow that classifies product reviews as fake or real.

  1. The Step Functions workflow processes an array of product reviews using Distributed Map.
  2. It involves calling the Amazon Nova Micro model through Amazon Bedrock to classify the review text.
  3. If a review is classified as fake, then the workflow publishes an event to an EventBridge bus, providing a flexible integration for potential downstream analysis or notifications.
  4. If a review is classified as real, then Step Functions calls the private HTTPS endpoint, using DNS address to further process the reviews.
  5. This private API is hosted in AWS Fargate behind an internal Application Load Balancer (ALB) within a VPC.
Step Functions workflow calling private HTTPS-based endpoint running in AWS Fargate

Figure 1: Step Functions workflow calling private HTTPS-based endpoint running in AWS Fargate

In real-world scenarios, this includes analyzing text patterns, user behavior, and linguistic cues to determine the authenticity of each review. Suspicious reviews are automatically flagged by building customized workflows to maintain the integrity of the product feedback system.

Deploying the example

Before configuring the private integration, create an Amazon Route53 public hosted zone with a registered domain (such as api.com), and an AWS Certificate Manager (ACM) certificate corresponding to the domain. While Amazon Route53 private hosted zones is currently not supported, utilizing public hosted zones resolves the domain name to a private IP address, accessible only from within the VPC.

This post includes a sample application and deployment instructions. For complete details, refer to the README.

Scenario 1: Single account

In this scenario, the Step Functions, EventBridge connections, and private resources reside in the same account, as shown in the following figure

Overview of a single account setup with Step Functions workflow and private API in the same account

Figure 2: Overview of a single account setup with Step Functions workflow and private API in the same account

  1. VPC Resource Gateway acts like the entry point to access the private resources running within your VPC. As a best-practice, consider creating a resource gateway to span across multiple private subnets (Availability Zones) for high availability. Refer to the AWS Cloud Development Kit (AWS CDK) code snippet in lib/vpclattice-stack.ts for resource gateway implementation.
  2. Resource Configurations establish the connection between the private endpoint and the Resource Gateway and are used to uniquely identify the private resources running within your VPC. Refer to the AWS CDK code snippet in lib/vpclattice-stack.ts to create Resource Configuration, and configure the domain name and port.
  3. To enable Step Functions to communicate with the private VPC resources, you create an EventBridge Connection. This handles the authorization and private connectivity to connect to the private API. Refer to the AWS CDK code snippet in lib/workflow-stack.ts for creating EventBridge Connections.
  4. The Step Functions state machine deployed as part of the sample application uses the HTTPS Invoke task type to call the private API. Calling private APIs from Step Functions allows you to use features such as built-in error handling like retries for transient issues and redrive for errors.

You can use the following payload to test the Step Functions execution:

{
  "items": [
    {
      "asin": "B000FA64PA",
      "helpful": [ 0, 0],
      "overall": 5,
      "reviewText": "Darth Maul working under cloak of darkness committing sabotage now that is a story worth reading many times over. Great story.",
      "reviewTime": "10 11, 2013",
      "unixReviewTime": 1381449600
    },
    {
      "asin": "B000F83SZQ",
      "helpful": [ 1, 1],
      "overall": 4,
      "reviewText": "Never heard of Amy Brewster. But I don't need to like Amy Brewster to like this book. Actually, Amy Brewster is a sidekick in this story, who added mystery to the story not the one resolved it. The story brings back the old times, simple life, simple people, and straight relationships.",
      "reviewTime": "03 22, 2014",
      "unixReviewTime": 1395446400
    }
  ]
}

The following figure shows the Step Functions execution where the review is classified as real and successfully invokes the private HTTPS endpoint.

Step Functions execution classifying the product reviews as real and successfully invoking the private API

Figure 3: Step Functions execution classifying the product reviews as real and successfully invoking the private API

Scenario 2: Cross account

In this scenario, all the private resources reside in Account A. The Step Functions and EventBridge Connections reside in Account B. The cross-account resource sharing is powered by AWS RAM, as shown in the following figure.

Cross-account setup

Figure 4: Cross-account setup

Following the creation of the Resource Gateway and the Resource Configuration, as described in the previous section, configure the resource share using AWS RAM in Account A.

  1. The sample application creates the AWS RAM resource share in Account A. This allows Account B to access private VPC resources in Account A, enabling secure, AWS Identity and Access Management (IAM) authorized access to the VPC resources in Account A. Refer to the CDK code snippet in lib/vpclattice-stack.ts to create cross-account resource share using AWS RAM.
  2. In Account B, AWS RAM receives an invitation from Account A to access the private VPC resources. Upon acceptance, the resource share status changes to Active, granting access to the private VPC resources in Account A.
  3. To enable access from Account B’s Step Function or EventBridge to Account A’s private VPC resources, create an EventBridge Connection as described in Step 3 (Single account scenario). Map this connection to the shared AWS RAM Resource Configuration created from the previous step.

Enterprises with distributed development teams operate across multiple AWS accounts. The setup described above enables secure cross-account access to VPC resources.

New connection state events

EventBridge now publishes change in the state events for new or existing connections. This is useful when taking actions on state changes or for troubleshooting purposes. The following example shows the state change events published for Connection Authorized and Connection Activated.

Figure 5: EventBridge connections state change

Figure 5: EventBridge connections state change

Conclusion

The new integration allows Amazon EventBridge and AWS Step Functions to integrate with private APIs, powered by AWS PrivateLink and Amazon VPC Lattice. Users can integrate legacy on-premises systems with cloud-native applications using event-driven architectures and workflow orchestration. The integration helps enterprises modernize distributed applications across public and private networks, enabling faster innovation, higher performance, and lower costs by eliminating the need for custom networking or integration code.

For more details, refer to the EventBridge and Step Functions documentation. Check out this video on setting up integrations with EventBridge and Step Functions. Get the sample code used in this post from this GitHub repository.

To expand your serverless knowledge, visit Serverless Land.