Tag Archives: Technical How-to

Power neural search with AI/ML connectors in Amazon OpenSearch Service

Post Syndicated from Aruna Govindaraju original https://aws.amazon.com/blogs/big-data/power-neural-search-with-ai-ml-connectors-in-amazon-opensearch-service/

With the launch of the neural search feature for Amazon OpenSearch Service in OpenSearch 2.9, it’s now effortless to integrate with AI/ML models to power semantic search and other use cases. OpenSearch Service has supported both lexical and vector search since the introduction of its k-nearest neighbor (k-NN) feature in 2020; however, configuring semantic search required building a framework to integrate machine learning (ML) models to ingest and search. The neural search feature facilitates text-to-vector transformation during ingestion and search. When you use a neural query during search, the query is translated into a vector embedding and k-NN is used to return the nearest vector embeddings from the corpus.

To use neural search, you must set up an ML model. We recommend configuring AI/ML connectors to AWS AI and ML services (such as Amazon SageMaker or Amazon Bedrock) or third-party alternatives. Starting with version 2.9 on OpenSearch Service, AI/ML connectors integrate with neural search to simplify and operationalize the translation of your data corpus and queries to vector embeddings, thereby removing much of the complexity of vector hydration and search.

In this post, we demonstrate how to configure AI/ML connectors to external models through the OpenSearch Service console.

Solution Overview

Specifically, this post walks you through connecting to a model in SageMaker. Then we guide you through using the connector to configure semantic search on OpenSearch Service as an example of a use case that is supported through connection to an ML model. Amazon Bedrock and SageMaker integrations are currently supported on the OpenSearch Service console UI, and the list of UI-supported first- and third-party integrations will continue to grow.

For any models not supported through the UI, you can instead set them up using the available APIs and the ML blueprints. For more information, refer to Introduction to OpenSearch Models. You can find blueprints for each connector in the ML Commons GitHub repository.


Before connecting the model via the OpenSearch Service console, create an OpenSearch Service domain. Map an AWS Identity and Access Management (IAM) role by the name LambdaInvokeOpenSearchMLCommonsRole as the backend role on the ml_full_access role using the Security plugin on OpenSearch Dashboards, as shown in the following video. The OpenSearch Service integrations workflow is pre-filled to use the LambdaInvokeOpenSearchMLCommonsRole IAM role by default to create the connector between the OpenSearch Service domain and the model deployed on SageMaker. If you use a custom IAM role on the OpenSearch Service console integrations, make sure the custom role is mapped as the backend role with ml_full_access permissions prior to deploying the template.

Deploy the model using AWS CloudFormation

The following video demonstrates the steps to use the OpenSearch Service console to deploy a model within minutes on Amazon SageMaker and generate the model ID via the AI connectors. The first step is to choose Integrations in the navigation pane on the OpenSearch Service AWS console, which routes to a list of available integrations. The integration is set up through a UI, which will prompt you for the necessary inputs.

To set up the integration, you only need to provide the OpenSearch Service domain endpoint and provide a model name to uniquely identify the model connection. By default, the template deploys the Hugging Face sentence-transformers model, djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2.

When you choose Create Stack, you are routed to the AWS CloudFormation console. The CloudFormation template deploys the architecture detailed in the following diagram.

The CloudFormation stack creates an AWS Lambda application that deploys a model from Amazon Simple Storage Service (Amazon S3), creates the connector, and generates the model ID in the output. You can then use this model ID to create a semantic index.

If the default all-MiniLM-L6-v2 model doesn’t serve your purpose, you can deploy any text embedding model of your choice on the chosen model host (SageMaker or Amazon Bedrock) by providing your model artifacts as an accessible S3 object. Alternatively, you can select one of the following pre-trained language models and deploy it to SageMaker. For instructions to set up your endpoint and models, refer to Available Amazon SageMaker Images.

SageMaker is a fully managed service that brings together a broad set of tools to enable high-performance, low-cost ML for any use case, delivering key benefits such as model monitoring, serverless hosting, and workflow automation for continuous training and deployment. SageMaker allows you to host and manage the lifecycle of text embedding models, and use them to power semantic search queries in OpenSearch Service. When connected, SageMaker hosts your models and OpenSearch Service is used to query based on inference results from SageMaker.

View the deployed model through OpenSearch Dashboards

To verify the CloudFormation template successfully deployed the model on the OpenSearch Service domain and get the model ID, you can use the ML Commons REST GET API through OpenSearch Dashboards Dev Tools.

The GET _plugins REST API now provides additional APIs to also view the model status. The following command allows you to see the status of a remote model:

GET _plugins/_ml/models/<modelid>

As shown in the following screenshot, a DEPLOYED status in the response indicates the model is successfully deployed on the OpenSearch Service cluster.

Alternatively, you can view the model deployed on your OpenSearch Service domain using the Machine Learning page of OpenSearch Dashboards.

This page lists the model information and the statuses of all the models deployed.

Create the neural pipeline using the model ID

When the status of the model shows as either DEPLOYED in Dev Tools or green and Responding in OpenSearch Dashboards, you can use the model ID to build your neural ingest pipeline. The following ingest pipeline is run in your domain’s OpenSearch Dashboards Dev Tools. Make sure you replace the model ID with the unique ID generated for the model deployed on your domain.

PUT _ingest/pipeline/neural-pipeline
  "description": "Semantic Search for retail product catalog ",
  "processors" : [
      "text_embedding": {
        "model_id": "sfG4zosBIsICJFsINo3X",
        "field_map": {
           "description": "desc_v",
           "name": "name_v"

Create the semantic search index using the neural pipeline as the default pipeline

You can now define your index mapping with the default pipeline configured to use the new neural pipeline you created in the previous step. Ensure the vector fields are declared as knn_vector and the dimensions are appropriate to the model that is deployed on SageMaker. If you have retained the default configuration to deploy the all-MiniLM-L6-v2 model on SageMaker, keep the following settings as is and run the command in Dev Tools.

PUT semantic_demostore
  "settings": {
    "index.knn": true,  
    "default_pipeline": "neural-pipeline",
    "number_of_shards": 1,
    "number_of_replicas": 1
  "mappings": {
    "properties": {
      "desc_v": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "name": "hnsw",
          "engine": "nmslib",
          "space_type": "cosinesimil"
      "name_v": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "name": "hnsw",
          "engine": "nmslib",
          "space_type": "cosinesimil"
      "description": {
        "type": "text" 
      "name": {
        "type": "text" 

Ingest sample documents to generate vectors

For this demo, you can ingest the sample retail demostore product catalog to the new semantic_demostore index. Replace the user name, password, and domain endpoint with your domain information and ingest raw data into OpenSearch Service:

curl -XPOST -u 'username:password' 'https://domain-end-point/_bulk' --data-binary @semantic_demostore.json -H 'Content-Type: application/json'

Validate the new semantic_demostore index

Now that you have ingested your dataset to the OpenSearch Service domain, validate if the required vectors are generated using a simple search to fetch all fields. Validate if the fields defined as knn_vectors have the required vectors.

Compare lexical search and semantic search powered by neural search using the Compare Search Results tool

The Compare Search Results tool on OpenSearch Dashboards is available for production workloads. You can navigate to the Compare search results page and compare query results between lexical search and neural search configured to use the model ID generated earlier.

Clean up

You can delete the resources you created following the instructions in this post by deleting the CloudFormation stack. This will delete the Lambda resources and the S3 bucket that contain the model that was deployed to SageMaker. Complete the following steps:

  1. On the AWS CloudFormation console, navigate to your stack details page.
  2. Choose Delete.

  1. Choose Delete to confirm.

You can monitor the stack deletion progress on the AWS CloudFormation console.

Note that, deleting the CloudFormation stack doesn’t delete the model deployed on the SageMaker domain and the AI/ML connector created. This is because these models and the connector can be associated with multiple indexes within the domain. To specifically delete a model and its associated connector, use the model APIs as shown in the following screenshots.

First, undeploy the model from the OpenSearch Service domain memory:

POST /_plugins/_ml/models/<model_id>/_undeploy

Then you can delete the model from the model index:

DELETE /_plugins/_ml/models/<model_id>

Lastly, delete the connector from the connector index:

DELETE /_plugins/_ml/connectors/<connector_id>


In this post, you learned how to deploy a model in SageMaker, create the AI/ML connector using the OpenSearch Service console, and build the neural search index. The ability to configure AI/ML connectors in OpenSearch Service simplifies the vector hydration process by making the integrations to external models native. You can create a neural search index in minutes using the neural ingestion pipeline and the neural search that use the model ID to generate the vector embedding on the fly during ingest and search.

To learn more about these AI/ML connectors, refer to Amazon OpenSearch Service AI connectors for AWS services, AWS CloudFormation template integrations for semantic search, and Creating connectors for third-party ML platforms.

About the Authors

Aruna Govindaraju is an Amazon OpenSearch Specialist Solutions Architect and has worked with many commercial and open source search engines. She is passionate about search, relevancy, and user experience. Her expertise with correlating end-user signals with search engine behavior has helped many customers improve their search experience.

Dagney Braun is a Principal Product Manager at AWS focused on OpenSearch.

Generate AI powered insights for Amazon Security Lake using Amazon SageMaker Studio and Amazon Bedrock

Post Syndicated from Jonathan Nguyen original https://aws.amazon.com/blogs/security/generate-ai-powered-insights-for-amazon-security-lake-using-amazon-sagemaker-studio-and-amazon-bedrock/

In part 1, we discussed how to use Amazon SageMaker Studio to analyze time-series data in Amazon Security Lake to identify critical areas and prioritize efforts to help increase your security posture. Security Lake provides additional visibility into your environment by consolidating and normalizing security data from both AWS and non-AWS sources. Security teams can use Amazon Athena to query data in Security Lake to aid in a security event investigation or proactive threat analysis. Reducing the security team’s mean time to respond to or detect a security event can decrease your organization’s security vulnerabilities and risks, minimize data breaches, and reduce operational disruptions. Even if your security team is already familiar with AWS security logs and is using SQL queries to sift through data, determining appropriate log sources to review and crafting customized SQL queries can add time to an investigation. Furthermore, when security analysts conduct their analysis using SQL queries, the results are point-in-time and don’t automatically factor results from previous queries.

In this blog post, we show you how to extend the capabilities of SageMaker Studio by using Amazon Bedrock, a fully-managed generative artificial intelligence (AI) service natively offering high-performing foundation models (FMs) from leading AI companies with a single API. By using Amazon Bedrock, security analysts can accelerate security investigations by using a natural language companion to automatically generate SQL queries, focus on relevant data sources within Security Lake, and use previous SQL query results to enhance the results from future queries. We walk through a threat analysis exercise to show how your security analysts can use natural language processing to answer questions such as which AWS account has the most AWS Security Hub findings, irregular network activity from AWS resources, or which AWS Identity and Access Management (IAM) principals invoked highly suspicious activity. By identifying possible vulnerabilities or misconfigurations, you can minimize mean time to detect and pinpoint specific resources to assess overall impact. We also discuss methods to customize Amazon Bedrock integration with data from your Security Lake. While large language models (LLMs) are useful conversational partners, it’s important to note that LLM responses can include hallucinations, which might not reflect truth or reality. We discuss some mechanisms to validate LLM responses and mitigate hallucinations. This blog post is best suited for technologists who have an in-depth understanding of generative artificial intelligence concepts and the AWS services used in the example solution.

Solution overview

Figure 1 depicts the architecture of the sample solution.

Figure 1: Security Lake generative AI solution architecture

Figure 1: Security Lake generative AI solution architecture

Before you deploy the sample solution, complete the following prerequisites:

  1. Enable Security Lake in your organization in AWS Organizations and specify a delegated administrator account to manage the Security Lake configuration for all member accounts in your organization. Configure Security Lake with the appropriate log sources: Amazon Virtual Private Cloud (VPC) Flow Logs, AWS Security Hub, AWS CloudTrail, and Amazon Route53.
  2. Create subscriber query access from the source Security Lake AWS account to the subscriber AWS account.
  3. Accept a resource share request in the subscriber AWS account in AWS Resource Access Manager (AWS RAM).
  4. Create a database link in AWS Lake Formation in the subscriber AWS account and grant access for the Athena tables in the Security Lake AWS account.
  5. Grant Claude v2 model access for Amazon Bedrock LLM Claude v2 in the AWS subscriber account where you will deploy the solution. If you try to use a model before you enable it in your AWS account, you will get an error message.

After you set up the prerequisites, the sample solution architecture provisions the following resources:

  1. A VPC is provisioned for SageMaker with an internet gateway, a NAT gateway, and VPC endpoints for all AWS services within the solution. An internet gateway or NAT gateway is required to install external open-source packages.
  2. A SageMaker Studio domain is created in VPCOnly mode with a single SageMaker user-profile that’s tied to an IAM role. As part of the SageMaker deployment, an Amazon Elastic File System (Amazon EFS) is provisioned for the SageMaker domain.
  3. A dedicated IAM role is created to restrict access to create or access the SageMaker domain’s presigned URL from a specific Classless Inter-Domain Routing (CIDR) for accessing the SageMaker notebook.
  4. An AWS CodeCommit repository containing Python notebooks used for the artificial intelligence and machine learning (AI/ML) workflow by the SageMaker user profile.
  5. An Athena workgroup is created for Security Lake queries with a S3 bucket for output location (access logging is configured for the output bucket).


Before deploying the sample solution and walking through this post, it’s important to understand the cost factors for the main AWS services being used. The cost will largely depend on the amount of data you interact with in Security Lake and the duration of running resources in SageMaker Studio.

  1. A SageMaker Studio domain is deployed and configured with default setting of a ml.t3.medium instance type. For a more detailed breakdown, see SageMaker Studio pricing. It’s important to shut down applications when they’re not in use because you’re billed for the number of hours an application is running. See the AWS samples repository for an automated shutdown extension.
  2. Amazon Bedrock on-demand pricing is based on the selected LLM and the number of input and output tokens. A token is comprised of a few characters and refers to the basic unit of text that a model learns to understand the user input and prompts. For a more detailed breakdown, see Amazon Bedrock pricing.
  3. The SQL queries generated by Amazon Bedrock are invoked using Athena. Athena cost is based on the amount of data scanned within Security Lake for that query. For a more detailed breakdown, see Athena pricing.

Deploy the sample solution

You can deploy the sample solution by using either the AWS Management Console or the AWS Cloud Development Kit (AWS CDK). For instructions and more information on using the AWS CDK, see Get Started with AWS CDK.

Option 1: Deploy using AWS CloudFormation using the console

Use the console to sign in to your subscriber AWS account and then choose the Launch Stack button to open the AWS CloudFormation console that’s pre-loaded with the template for this solution. It takes approximately 10 minutes for the CloudFormation stack to complete.

Select the Launch Stack button to launch the template

Option 2: Deploy using AWS CDK

  1. Clone the Security Lake generative AI sample repository.
  2. Navigate to the project’s source folder (…/amazon-security-lake-generative-ai/source).
  3. Install project dependencies using the following commands:
    npm install -g aws-cdk-lib
    npm install

  4. On deployment, you must provide the following required parameters:
    • IAMroleassumptionforsagemakerpresignedurl – this is the existing IAM role you want to use to access the AWS console to create presigned URLs for SageMaker Studio domain.
    • securitylakeawsaccount – this is the AWS account ID where Security Lake is deployed.
  5. Run the following commands in your terminal while signed in to your subscriber AWS account. Replace <INSERT_AWS_ACCOUNT> with your account number and replace <INSERT_REGION> with the AWS Region that you want the solution deployed to.
    cdk bootstrap aws://<INSERT_AWS_ACCOUNT>/<INSERT_REGION>
    cdk deploy --parameters IAMroleassumptionforsagemakerpresignedurl=arn:aws:iam::<INSERT_AWS_ACCOUNT>:role/<INSERT_IAM_ROLE_NAME> --parameters securitylakeawsaccount=<INSERT_SECURITY_LAKE_AWS_ACCOUNT_ID>

Post-deployment configuration steps

Now that you’ve deployed the solution, you must add permissions to allow SageMaker and Amazon Bedrock to interact with your Security Lake data.

Grant permission to the Security Lake database

  1. Copy the SageMaker user profile Amazon Resource Name (ARN)

  2. Go to the Lake Formation console.
  3. Select the amazon_security_lake_glue_db_<YOUR-REGION> database. For example, if your Security Lake is in us-east-1, the value would be amazon_security_lake_glue_db_us_east_1
  4. For Actions, select Grant.
  5. In Grant Data Permissions, select SAML Users and Groups.
  6. Paste the SageMaker user profile ARN from Step 1.
  7. In Database Permissions, select Describe, and then Grant.

Grant permission to Security Lake tables

You must repeat these steps for each source configured within Security Lake. For example, if you have four sources configured within Security Lake, you must grant permissions for the SageMaker user profile to four tables. If you have multiple sources that are in separate Regions and you don’t have a rollup Region configured in Security Lake, you must repeat the steps for each source in each Region.

The following example grants permissions to the Security Hub table within Security Lake. For more information about granting table permissions, see the AWS LakeFormation user-guide.

  1. Copy the SageMaker user-profile ARN arn:aws:iam:<account-id>:role/sagemaker-user-profile-for-security-lake.
  2. Go to the Lake Formation console.
  3. Select the amazon_security_lake_glue_db_<YOUR-REGION> database.
    For example, if your Security Lake database is in us-east-1 the value would be amazon_security_lake_glue_db_us_east_1
  4. Choose View Tables.
  5. Select the amazon_security_lake_table_<YOUR-REGION>_sh_findings_1_0 table.
    For example, if your Security Lake table is in us-east-1 the value would be amazon_security_lake_table_us_east_1_sh_findings_1_0

    Note: Each table must be granted access individually. Selecting All Tables won’t grant the access needed to query Security Lake.

  6. For Actions, select Grant.
  7. In Grant Data Permissions, select SAML Users and Groups.
  8. Paste the SageMaker user profile ARN from Step 1.
  9. In Table Permissions, select Describe, and then Grant.

Launch your SageMaker Studio application

Now that you’ve granted permissions for a SageMaker user profile, you can move on to launching the SageMaker application associated to that user profile.

  1. Navigate to the SageMaker Studio domain in the console.
  2. Select the SageMaker domain security-lake-gen-ai-<subscriber-account-id>.
  3. Select the SageMaker user profile sagemaker-user-profile-for-security-lake.
  4. For Launch, select Studio.
Figure 2: SageMaker Studio domain view

Figure 2: SageMaker Studio domain view

Clone the Python notebook

As part of the solution deployment, we’ve created a foundational Python notebook in CodeCommit to use within your SageMaker app.

  1. Navigate to CloudFormation in the console.
  2. In the Stacks section, select the SageMakerDomainStack.
  3. Select the Outputs tab.
  4. Copy the value for the SageMaker notebook generative AI repository URL. (For example: https://git-codecommit.us-east-1.amazonaws.com/v1/repos/sagemaker_gen_ai_repo)
  5. Go back to your SageMaker app.
  6. In SageMaker Studio, in the left sidebar, choose the Git icon (a diamond with two branches), then choose Clone a Repository.
    Figure 3: SageMaker Studio clone repository option

    Figure 3: SageMaker Studio clone repository option

  7. Paste the CodeCommit repository link from Step 4 under the Git repository URL (git). After you paste the URL, select Clone “https://git-codecommit.us-east-1.amazonaws.com/v1/repos/sagemaker_gen_ai_repo”, then select Clone.

Note: If you don’t select from the auto-populated list, SageMaker won’t be able to clone the repository and will return a message that the URL is invalid.

Figure 4: SageMaker Studio clone HTTPS repository URL

Figure 4: SageMaker Studio clone HTTPS repository URL

Configure your notebook to use generative AI

In the next section, we walk through how we configured the notebook and why we used specific LLMs, agents, tools, and additional configurations so you can extend and customize this solution to your use case.

The notebook we created uses the LangChain framework. LangChain is a framework for developing applications powered by language models and processes natural language inputs from the user, generates SQL queries, and runs those queries on your Security Lake data. For our use case, we’re using LangChain with Anthropic’s Claude 2 model on Amazon Bedrock.

Set up the notebook environment

  1. After you’re in the generative_ai_security_lake.ipynb notebook, you can set up your notebook environment. Keep the default settings and choose Select.
    Figure 5: SageMaker Studio notebook start-up configuration

    Figure 5: SageMaker Studio notebook start-up configuration

  2. Run the first cell to install the requirements listed in the requirements.txt file.

Connect to the Security Lake database using SQLAlchemy

The example solution uses a pre-populated Security Lake database with metadata in the AWS Glue Data Catalog. The inferred schema enables the LLM to generate SQL queries in response to the questions being asked.

LangChain uses SQLAlchemy, which is a Python SQL toolkit and object relational mapper, to access databases. To connect to a database, first import SQLAlchemy and create an engine object by specifying the following:

  • ATHENA REST API details

You can use the following configuration code to establish database connections and start querying.

import os
REGION_NAME = os.environ.get('REGION_NAME', 'us-east-1')
REGION_FMT = REGION_NAME.replace("-","_")

from langchain import SQLDatabase
from sqlalchemy import create_engine

#Amazon Security Lake Database
SCHEMA_NAME = f"amazon_security_lake_glue_db_{REGION_FMT}"

#S3 Staging location for Athena query output results and this will be created by deploying the Cloud Formation stack
S3_STAGING_DIR = f's3://athena-gen-ai-bucket-results-{ACCOUNT_ID}/output/'

engine_athena = create_engine(

athena_db = SQLDatabase(engine_athena)
db = athena_db

Initialize the LLM and Amazon Bedrock endpoint URL

Amazon Bedrock provides a list of Region-specific endpoints for making inference requests for models hosted in Amazon Bedrock. In this post, we’ve defined the model ID as Claude v2 and the Amazon Bedrock endpoint as us-east-1. You can change this to other LLMs and endpoints as needed for your use case.

Obtain a model ID from the AWS console

  1. Go to the Amazon Bedrock console.
  2. In the navigation pane, under Foundation models, select Providers.
  3. Select the Anthropic tab from the top menu and then select Claude v2.
  4. In the model API request note the model ID value in the JSON payload.

Note: Alternatively, you can use the AWS Command Line Interface (AWS CLI) to run the list-foundation-models command in a SageMaker notebook cell or a CLI terminal to the get the model ID. For AWS SDK, you can use the ListFoundationModels operation to retrieve information about base models for a specific provider.

Figure 6: Amazon Bedrock Claude v2 model ID

Figure 6: Amazon Bedrock Claude v2 model ID

Set the model parameters

After the LLM and Amazon Bedrock endpoints are configured, you can use the model_kwargs dictionary to set model parameters. Depending on your use case, you might use different parameters or values. In this example, the following values are already configured in the notebook and passed to the model.

  1. temperature: Set to 0. Temperature controls the degree of randomness in responses from the LLM. By adjusting the temperature, users can control the balance between having predictable, consistent responses (value closer to 0) compared to more creative, novel responses (value closer to 1).

    Note: Instead of using the temperature parameter, you can set top_p, which defines a cutoff based on the sum of probabilities of the potential choices. If you set Top P below 1.0, the model considers the most probable options and ignores less probable ones. According to Anthropic’s user guide, “you should either alter temperature or top_p, but not both.”

  2. top_k: Set to 0. While temperature controls the probability distribution of potential tokens, top_k limits the sample size for each subsequent token. For example, if top_k=50, the model selects from the 50 most probable tokens that could be next in a sequence. When you lower the top_k value, you remove the long tail of low probability tokens to select from in a sequence.
  3. max_tokens_to_sample: Set to 4096. For Anthropic models, the default is 256 and the max is 4096. This value denotes the absolute maximum number of tokens to predict before the generation stops. Anthropic models can stop before reaching this maximum.
Figure 7: Notebook configuration for Amazon Bedrock

Figure 7: Notebook configuration for Amazon Bedrock

Create and configure the LangChain agent

An agent uses a LLM and tools to reason and determine what actions to take and in which order. For this use case, we used a Conversational ReAct agent to remember conversational history and results to be used in a ReAct loop (Question → Thought → Action → Action Input → Observation ↔ repeat → Answer). This way, you don’t have to remember how to incorporate previous results in the subsequent question or query. Depending on your use case, you can configure a different type of agent.

Create a list of tools

Tools are functions used by an agent to interact with the available dataset. The agent’s tools are used by an action agent. We import both SQL and Python REPL tools:

  1. List the available log source tables in the Security Lake database
  2. Extract the schema and sample rows from the log source tables
  3. Create SQL queries to invoke in Athena
  4. Validate and rewrite the queries in case of syntax errors
  5. Invoke the query to get results from the appropriate log source tables
Figure 8: Notebook LangChain agent tools

Figure 8: Notebook LangChain agent tools

Here’s a breakdown for the tools used and the respective prompts:

  • QuerySQLDataBaseTool: This tool accepts detailed and correct SQL queries as input and returns results from the database. If the query is incorrect, you receive an error message. If there’s an error, rewrite and recheck the query, and try again. If you encounter an error such as Unknown column xxxx in field list, use the sql_db_schema to verify the correct table fields.
  • InfoSQLDatabaseTool: This tool accepts a comma-separated list of tables as input and returns the schema and sample rows for those tables. Verify that the tables exist by invoking the sql_db_list_tables first. The input format is: table1, table2, table3
  • ListSQLDatabaseTool: The input is an empty string, the output is a comma separated list of tables in the database
  • QuerySQLCheckerTool: Use this tool to check if your query is correct before running it. Always use this tool before running a query with sql_db_query
  • PythonREPLTool: A Python shell. Use this to run python commands. The input should be a valid python command. If you want to see the output of a value, you should print it out with print(…).

Note: If a native tool doesn’t meet your needs, you can create custom tools. Throughout our testing, we found some of the native tools provided most of what we needed but required minor tweaks for our use case. We changed the default behavior for the tools for use with Security Lake data.

Create an output parser

Output parsers are used to instruct the LLM to respond in the desired output format. Although the output parser is optional, it makes sure the LLM response is formatted in a way that can be quickly consumed and is actionable by the user.

Figure 9: LangChain output parser setting

Figure 9: LangChain output parser setting

Adding conversation buffer memory

To make things simpler for the user, previous results should be stored for use in subsequent queries by the Conversational ReAct agent. ConversationBufferMemory provides the capability to maintain state from past conversations and enables the user to ask follow-up questions in the same chat context. For example, if you asked an agent for a list of AWS accounts to focus on, you want your subsequent questions to focus on that same list of AWS accounts instead of writing the values down somewhere and keeping track of it in the next set of questions. There are many other types of memory that can be used to optimize your use cases.

Figure 10: LangChain conversation buffer memory setting

Figure 10: LangChain conversation buffer memory setting

Initialize the agent

At this point, all the appropriate configurations are set and it’s time to load an agent executor by providing a set of tools and a LLM.

  1. tools: List of tools the agent will have access to.
  2. llm: LLM the agent will use.
  3. agent: Agent type to use. If there is no value provided and agent_path is set, the agent used will default to AgentType.ZERO_SHOT_REACT_DESCRIPTION.
  4. agent_kwargs: Additional keyword arguments to pass to the agent.
Figure 11: LangChain agent initialization

Figure 11: LangChain agent initialization

Note: For this post, we set verbose=True to view the agent’s intermediate ReAct steps, while answering questions. If you’re only interested in the output, set verbose=False.

You can also set return_direct=True to have the tool output returned to the user and closing the agent loop. Since we want to maintain the results of the query and used by the LLM, we left the default value of return_direct=False.

Provide instructions to the agent on using the tools

In addition to providing the agent with a list of tools, you would also give instructions to the agent on how and when to use these tools for your use case. This is optional but provides the agent with more context and can lead to better results.

Figure 12: LangChain agent instructions

Figure 12: LangChain agent instructions

Start your threat analysis journey with the generative AI-powered agent

Now that you’ve walked through the same set up process we used to create and initialize the agent, we can demonstrate how to analyze Security Lake data using natural language input questions that a security researcher might ask. The following examples focus on how you can use the solution to identify security vulnerabilities, risks, and threats and prioritize mitigating them. For this post, we’re using native AWS sources, but the agent can analyze any custom log sources configured in Security Lake. You can also use this solution to assist with investigations of possible security events in your environment.

For each of the questions that follow, you would enter the question in the free-form cell after it has run, similar to Figure 13.

Note: Because the field is free form, you can change the questions. Depending on the changes, you might see different results than are shown in this post. To end the conversation, enter exit and press the Enter key.

Figure 13: LangChain agent conversation input

Figure 13: LangChain agent conversation input

Question 1: What data sources are available in Security Lake?

In addition to the native AWS sources that Security Lake automatically ingests, your security team can incorporate additional custom log sources. It’s important to know what data is available to you to determine what and where to investigate. As shown in Figure 14, the Security Lake database contains the following log sources as tables:

If there are additional custom sources configured, they will also show up here. From here, you can focus on a smaller subset of AWS accounts that might have a larger number of security-related findings.

Figure 14: LangChain agent output for Security Lake tables

Figure 14: LangChain agent output for Security Lake tables

Question 2: What are the top five AWS accounts that have the most Security Hub findings?

Security Hub is a cloud security posture management service that not only aggregates findings from other AWS security services—such as Amazon GuardDuty, Amazon Macie, AWS Firewall Manager, and Amazon Inspector—but also from a number of AWS partner security solutions. Additionally, Security Hub has its own security best practices checks to help identify any vulnerabilities within your AWS environment. Depending on your environment, this might be a good starting place to look for specific AWS accounts to focus on.

Figure 15: LangChain output for AWS accounts with Security Hub findings

Figure 15: LangChain output for AWS accounts with Security Hub findings

Question 3: Within those AWS accounts, were any of the following actions found in (CreateUser, AttachUserPolicy, CreateAccessKey, CreateLoginProfile, DeleteTrail, DeleteMembers, UpdateIPSet, AuthorizeSecurityGroupIngress) in CloudTrail?

With the list of AWS accounts to look at narrowed down, you might be interested in mutable changes in your AWS account that you would deem suspicious. It’s important to note that every AWS environment is different, and some actions might be suspicious for one environment but normal in another. You can tailor this list to actions that shouldn’t happen in your environment. For example, if your organization normally doesn’t use IAM users, you can change the list to look at a list of actions for IAM, such as CreateAccessKey, CreateLoginProfile, CreateUser, UpdateAccessKey, UpdateLoginProfile, and UpdateUser.

By looking at the actions related to AWS CloudTrail (CreateUser, AttachUserPolicy, CreateAccessKey, CreateLoginProfile, DeleteTrail, DeleteMembers, UpdateIPSet, AuthorizeSecurityGroupIngress), you can see which actions were taken in your environment and choose which to focus on. Because the agent has access to previous chat history and results, you can ask follow-up questions on the SQL results without having to specify the AWS account IDs or event names.

Figure 16: LangChain agent output for CloudTrail actions taken in AWS Organization

Figure 16: LangChain agent output for CloudTrail actions taken in AWS Organization

Question 4: Which IAM principals took those actions?

The previous question narrowed down the list to mutable actions that shouldn’t occur. The next logical step is to determine which IAM principals took those actions. This helps correlate an actor to the actions that are either unexpected or are reserved for only authorized principals. For example, if you have an IAM principal tied to a continuous integration and delivery (CI/CD) pipeline, that could be less suspicious. Alternatively, if you see an IAM principal that you don’t recognize, you could focus on all actions taken by that IAM principal, including how it was provisioned in the first place.

Figure 17: LangChain agent output for CloudTrail IAM principals that invoked events from the previous query

Figure 17: LangChain agent output for CloudTrail IAM principals that invoked events from the previous query

Question 5: Within those AWS accounts, were there any connections made to “”?

If you don’t find anything useful related to mutable changes to CloudTrail, you can pivot to see if there were any network connections established from a specific Classless Inter-Domain Routing (CIDR) range. For example, if an organization primarily interacts with AWS resources within your AWS Organizations from your corporate-owned CIDR range, anything outside of that might be suspicious. Additionally, if you have threat lists or suspicious IP ranges, you can add them to the query to see if there are any network connections established from those ranges. The agent knows that the query is network related and to look in VPC flow logs and is focusing on only the AWS accounts from Question 2.

Figure 18: LangChain agent output for VPC flow log matches to specific CIDR

Figure 18: LangChain agent output for VPC flow log matches to specific CIDR

Question 6: As a security analyst, what other evidence or logs should I look for to determine if there are any indicators of compromise in my AWS environment?

If you haven’t found what you’re looking for and want some inspiration from the agent, you can ask the agent what other areas you should look at within your AWS environment. This might help you create a threat analysis thesis or use case as a starting point. You can also refer to the MITRE ATT&CK Cloud Matrix for more areas to focus on when setting up questions for your agent.

Figure 19: LangChain agent output for additional scenarios and questions to investigate

Figure 19: LangChain agent output for additional scenarios and questions to investigate

Based on the answers given, you can start a new investigation to identify possible vulnerabilities and threats:

  • Is there any unusual API activity in my organization that could be an indicator of compromise?
  • Have there been any AWS console logins that don’t match normal geographic patterns?
  • Have there been any spikes in network traffic for my AWS resources?

Agent running custom SQL queries

If you want to use a previously generated or customized SQL query, the agent can run the query as shown in Figure 20 that follows. In the previous questions, a SQL query is generated in the agent’s Action Input field. You can use that SQL query as a baseline, edit the SQL query manually to fit your use case, and then run the modified query through the agent. The modified query results are stored in memory and can be used for subsequent natural language questions to the agent. Even if your security analysts already have SQL experience, having the agent give a recommendation or template SQL query can shorten your investigation.

Figure 20: LangChain agent output for invoking custom SQL queries

Figure 20: LangChain agent output for invoking custom SQL queries

Agent assistance to automatically generate visualizations

You can get help from the agent to create visualizations by using the PythonREPL tool to generate code and plot SQL query results. As shown in Figure 21, you can ask the agent to get results from a SQL query and generate code to create a visualization based on those results. You can then take the generated code and put it into the next cell to create the visualization.

Figure 21: LangChain agent output to generate code to visualize SQL results in a plot

Figure 21: LangChain agent output to generate code to visualize SQL results in a plot

The agent returns example code after To plot the results. You can copy the code between ‘‘‘python and ’’’ and input that code in the next cell. After you run that cell, a visual based on the SQL results is created similar to Figure 22 that follows. This can be helpful to share the notebook output as part of an investigation to either create a custom detection to monitor or determine how a vulnerability can be mitigated.

Figure 22: Notebook Python code output from code generated by LangChain agent

Figure 22: Notebook Python code output from code generated by LangChain agent

Tailoring your agent to your data

As previously discussed, use cases and data vary between organizations. It’s important to understand the foundational components in terms of how you can configure and tailor the LLM, agents, tools, and configuration to your environment. The notebook in the solution was the result of experiments to determine and display what’s possible. Along the way, you might encounter challenges or issues depending on changes you make in the notebook or by adding additional data sources. Below are some tips to help you create and tailor the notebook to your use case.

  • If the agent pauses in the intermediate steps or asks for guidance to answer the original question, you can guide the agent with prompt engineering techniques, using commands such as execute or continue to move the process along.
  • If the agent is hallucinating or providing data that isn’t accurate, see Anthropic’s user guide for mechanisms to reduce hallucinations. An example of a hallucination would be the response having generic information such as an AWS account that is 1234567890 or the resulting count of a query being repeated for multiple rows.

    Note: You can also use Retrieval Augmented Generation (RAG) in Amazon SageMaker to mitigate hallucinations.

SageMaker Studio and Amazon Bedrock provide native integration to use a variety of generative AI tools with your Security Lake data to help increase your organization’s security posture. Some other use cases you can try include:

  • Investigating impact and root cause for a suspected compromise of an Amazon Elastic Compute Cloud (Amazon EC2) instance from a GuardDuty finding.
  • Determining if network ACL or firewall changes in your environment affected the number of AWS resources communicating with public endpoints.
  • Checking if any S3 buckets with possibly confidential or sensitive data were accessed by non-authorized IAM principals.
  • Identify if an EC2 instance that might be compromised made any internal or external connections to other AWS resources and then if those resources were impacted.


This solution demonstrates how you can use the generative AI capabilities of Amazon Bedrock and natural language input in SageMaker Studio to analyze data in Security Lake and work towards reducing your organization’s risk and increase your security posture. The Python notebook is primarily meant to serve as a starting point to walk through an example scenario to identify potential vulnerabilities and threats.

Security Lake is continually working on integrating native AWS sources, but there are also custom data sources outside of AWS that you might want to import for your agent to analyze. We also showed you how we configured the notebook to use agents and LLMs, and how you can tune each component within a notebook to your specific use case.

By enabling your security team to analyze and interact with data in Security Lake using natural language input, you can reduce the amount of time needed to conduct an investigation by automatically identifying the appropriate data sources, generating and invoking SQL queries, and visualizing data from your investigation. This post focuses on Security Lake, which normalizes data into Open Cybersecurity Schema Framework (OCSF), but as long as the database data schema is normalized, the solution can be applied to other data stores.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Generative AI on AWS re:Post or contact AWS Support.


Jonathan Nguyen

Jonathan is a Principal Security Architect at AWS. His background is in AWS security with a focus on threat detection and incident response. He helps enterprise customers develop a comprehensive AWS security strategy and deploy security solutions at scale, and trains customers on AWS security best practices.


Madhunika Reddy Mikkili

Madhunika is a Data and Machine Learning Engineer with the AWS Professional Services Shared Delivery Team. She is passionate about helping customers achieve their goals through the use of data and machine learning insights. Outside of work, she loves traveling and spending time with family and friends.

Harsh Asnani

Harsh Asnani

Harsh is a Machine Learning Engineer at AWS. His Background is in applied Data Science with a focus on operationalizing Machine Learning workloads in the cloud at scale.

Kartik Kannapur

Kartik Kannapur

Kartik is a Senior Data Scientist with AWS Professional Services. His background is in Applied Mathematics and Statistics. He works with enterprise customers, helping them use machine learning to solve their business problems.

Disaster recovery strategies for Amazon MWAA – Part 1

Post Syndicated from Parnab Basak original https://aws.amazon.com/blogs/big-data/disaster-recovery-strategies-for-amazon-mwaa-part-1/

In the dynamic world of cloud computing, ensuring the resilience and availability of critical applications is paramount. Disaster recovery (DR) is the process by which an organization anticipates and addresses technology-related disasters. For organizations implementing critical workload orchestration using Amazon Managed Workflows for Apache Airflow (Amazon MWAA), it is crucial to have a DR plan in place to ensure business continuity.

In this series, we explore the need for Amazon MWAA disaster recovery and prescribe solutions that will sustain Amazon MWAA environments against unintended disruptions. This lets you to define, avoid, and handle disruption risks as part of your business continuity plan. This post focuses on designing the overall DR architecture. A future post in this series will focus on implementing the individual components using AWS services.

The need for Amazon MWAA disaster recovery

Amazon MWAA, a fully managed service for Apache Airflow, brings immense value to organizations by automating workflow orchestration for extract, transform, and load (ETL), DevOps, and machine learning (ML) workloads. Amazon MWAA has a distributed architecture with multiple components such as scheduler, worker, web server, queue, and database. This makes it difficult to implement a comprehensive DR strategy.

An active Amazon MWAA environment continuously parses Airflow Directed Acyclic Graphs (DAGs), reading them from a configured Amazon Simple Storage Service (Amazon S3) bucket. DAG source unavailability due to network unreachability, unintended corruption, or deletes leads to extended downtime and service disruption.

Within Airflow, the metadata database is a core component storing configuration variables, roles, permissions, and DAG run histories. A healthy metadata database is therefore critical for your Airflow environment. As with any core Airflow component, having a backup and disaster recovery plan in place for the metadata database is essential.

Amazon MWAA deploys Airflow components to multiple Availability Zones within your VPC in your preferred AWS Region. This provides fault tolerance and automatic recovery against a single Availability Zone failure. For mission-critical workloads, being resilient to the impairments of a unitary Region through multi-Region deployments is additionally important to ensure high availability and business continuity.

Balancing between costs to maintain redundant infrastructures, complexity, and recovery time is essential for Amazon MWAA environments. Organizations aim for cost-effective solutions that minimize their Recovery Time Objective (RTO) and Recovery Point Objective (RPO) to meet their service level agreements, be economically viable, and meet their customers’ demands.

Detect disasters in the primary environment: Proactive monitoring through metrics and alarms

Prompt detection of disasters in the primary environment is crucial for timely disaster recovery. Monitoring the Amazon CloudWatch SchedulerHeartbeat metric provides insights into Airflow health of an active Amazon MWAA environment. You can add other health check metrics to the evaluation criteria, such as checking the availability of upstream or downstream systems and network reachability. Combined with CloudWatch alarms, you can send notifications when these thresholds over a number of time periods are not met. You can add alarms to dashboards to monitor and receive alerts about your AWS resources and applications across multiple Regions.

AWS publishes our most up-to-the-minute information on service availability on the Service Health Dashboard. You can check at any time to get current status information, or subscribe to an RSS feed to be notified of interruptions to each individual service in your operating Region. The AWS Health Dashboard provides information about AWS Health events that can affect your account.

By combining metric monitoring, available dashboards, and automatic alarming, you can promptly detect unavailability of your primary environment, enabling proactive measures to transition to your DR plan. It is critical to factor in incident detection, notification, escalation, discovery, and declaration into your DR planning and implementation to provide realistic and achievable objectives that provide business value.

In the following sections, we discuss two Amazon MWAA DR strategy solutions and their architecture.

DR strategy solution 1: Backup and restore

The backup and restore strategy involves generating Airflow component backups in the same or different Region as your primary Amazon MWAA environment. To ensure continuity, you can asynchronously replicate these to your DR Region, with minimal performance impact on your primary Amazon MWAA environment. In the event of a rare primary Regional impairment or service disruption, this strategy will create a new Amazon MWAA environment and recover historical data to it from existing backups. However, it’s important to note that during the recovery process, there will be a period where no Airflow environments are operational to process workflows until the new environment is fully provisioned and marked as available.

This strategy provides a low-cost and low-complexity solution that is also suitable for mitigating against data loss or corruption within your primary Region. The amount of data being backed up and the time to create a new Amazon MWAA environment (typically 20–30 minutes) affects how quickly restoration can happen. To enable infrastructure to be redeployed quickly without errors, deploy using infrastructure as code (IaC). Without IaC, it may be complex to restore an analogous DR environment, which will lead to increased recovery times and possibly exceed your RTO.

Let’s explore the setup required when your primary Amazon MWAA environment is actively running, as shown in the following figure.

Backup and Restore - Pre

The solution comprises three key components. The first component is the primary environment, where the Airflow workflows are initially deployed and actively running. The second component is the disaster monitoring component, comprised of CloudWatch and a combination of an AWS Step Functions state machine and a AWS Lambda function. The third component is for creating and storing backups of all configurations and metadata that is required to restore. This can be in the same Region as your primary or replicated to your DR Region using S3 Cross-Region Replication (CRR). For CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region.

The first three steps in the workflow are as follows:

  1. As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
  2. Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to the CloudWatch SchedulerHeartbeat metric.
  3. A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridge schedule to monitor the scheduler’s health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.

The following figure illustrates the additional steps in the solution workflow.

Backup and Restore post

  1. When the heartbeat count deviates from the normal count for a period of time, a series of actions are initiated to recover to a new Amazon MWAA environment in the DR Region. These actions include starting creation of a new Amazon MWAA environment, replicating the primary environment configurations, and then waiting for the new environment to become available.
  2. When the environment is available, an import DAG utility is run to restore the metadata contents from the backups. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.

DR strategy solution 2: Active-passive environments with periodic data synchronization

The active-passive environments with periodic data synchronization strategy focuses on maintaining recurrent data synchronization between an active primary and a passive Amazon MWAA DR environment. By periodically updating and synchronizing DAG stores and metadata databases, this strategy ensures that the DR environment remains current or nearly current with the primary. The DR Region can be the same or a different Region than your primary Amazon MWAA environment. In the event of a disaster, backups are available to revert to a previous known good state to minimize data loss or corruption.

This strategy provides low RTO and RPO with frequent synchronization, allowing quick recovery with minimal data loss. The infrastructure costs and code deployments are compounded to maintain both the primary and DR Amazon MWAA environments. Your DR environment is available immediately to run DAGs on.

The following figure illustrates the setup required when your primary Amazon MWAA environment is actively running.

Active Passive pre

The solution comprises four key components. Similar to the backup and restore solution, the first component is the primary environment, where the workflow is initially deployed and is actively running. The second component is the disaster monitoring component, consisting of CloudWatch and a combination of a Step Functions state machine and Lambda function. The third component creates and stores backups for all configurations and metadata required for the database synchronization. This can be in the same Region as your primary or replicated to your DR Region using Amazon S3 Cross-Region Replication. As mentioned earlier, for CRR, you also pay for inter-Region data transfer out from Amazon S3 to each destination Region. The last component is a passive Amazon MWAA environment that has the same Airflow code and environment configurations as the primary. The DAGs are deployed in the DR environment using the same continuous integration and continuous delivery (CI/CD) pipeline as the primary. Unlike the primary, DAGs are kept in a paused state to not cause duplicate runs.

The first steps of the workflow are similar to the backup and restore strategy:

  1. As part of your backup creation process, Airflow metadata is replicated to an S3 bucket using an export DAG utility, run periodically based on your RPO interval.
  2. Your existing primary Amazon MWAA environment automatically emits the status of its scheduler’s health to CloudWatch SchedulerHeartbeat metric.
  3. A multi-step Step Functions state machine is triggered from a periodic Amazon EventBridge schedule to monitor scheduler health status. As the primary step of the state machine, a Lambda function evaluates the status of the SchedulerHeartbeat metric. If the metric is deemed healthy, no action is taken.

The following figure illustrates the final steps of the workflow.

Active Passive post

  1. When the heartbeat count deviates from the normal count for a period of time, DR actions are initiated.
  2. As a first step, a Lambda function triggers an import DAG utility to restore the metadata contents from the backups to the passive Amazon MWAA DR environment. When the imports are complete, the same DAG can un-pause the other Airflow DAGs, making them active for future runs. Any DAG runs that were interrupted during the impairment of the primary environment need to be manually rerun to maintain service level agreements. Future DAG runs are queued to run as per their next configured schedule.

Best practices to improve resiliency of Amazon MWAA

To enhance the resiliency of your Amazon MWAA environment and ensure smooth disaster recovery, consider implementing the following best practices:

  • Robust backup and restore mechanisms – Implementing comprehensive backup and restore mechanisms for Amazon MWAA data is essential. Regularly deleting existing metadata based on your organization’s retention policies reduces backup times and makes your Amazon MWAA environment more performant.
  • Automation using IaC – Using automation and orchestration tools such as AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform can streamline the deployment and configuration management of Amazon MWAA environments. This ensures consistency, reproducibility, and faster recovery during DR scenarios.
  • Idempotent DAGs and tasks – In Airflow, a DAG is considered idempotent if rerunning the same DAG with the same inputs multiple times has the same effect as running it only once. Designing idempotent DAGs and keeping tasks atomic decreases recovery time from failures when you have to manually rerun an interrupted DAG in your recovered environment.
  • Regular testing and validation – A robust Amazon MWAA DR strategy should include regular testing and validation exercises. By simulating disaster scenarios, you can identify any gaps in your DR plans, fine-tune processes, and ensure your Amazon MWAA environments are fully recoverable.


In this post, we explored the challenges for Amazon MWAA disaster recovery and discussed best practices to improve resiliency. We examined two DR strategy solutions: backup and restore and active-passive environments with periodic data synchronization. By implementing these solutions and following best practices, you can protect your Amazon MWAA environments, minimize downtime, and mitigate the impact of disasters. Regular testing, validation, and adaptation to evolving requirements are crucial for an effective Amazon MWAA DR strategy. By continuously evaluating and refining your disaster recovery plans, you can ensure the resilience and uninterrupted operation of your Amazon MWAA environments, even in the face of unforeseen events.

For additional details and code examples on Amazon MWAA, refer to the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

About the Authors

Parnab Basak is a Senior Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

Chandan Rupakheti is a Solutions Architect and a Serverless Specialist at AWS. He is a passionate technical leader, researcher, and mentor with a knack for building innovative solutions in the cloud and bringing stakeholders together in their cloud journey. Outside his professional life, he loves spending time with his family and friends besides listening and playing music.

Vinod Jayendra is a Enterprise Support Lead in ISV accounts at Amazon Web Services, where he helps customers in solving their architectural, operational, and cost optimization challenges. With a particular focus on Serverless technologies, he draws from his extensive background in application development to deliver top-tier solutions. Beyond work, he finds joy in quality family time, embarking on biking adventures, and coaching youth sports team.

Rupesh Tiwari is a Senior Solutions Architect at AWS in New York City, with a focus on Financial Services. He has over 18 years of IT experience in the finance, insurance, and education domains, and specializes in architecting large-scale applications and cloud-native big data workloads. In his spare time, Rupesh enjoys singing karaoke, watching comedy TV series, and creating joyful moments with his family.

Detect, mask, and redact PII data using AWS Glue before loading into Amazon OpenSearch Service

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/detect-mask-and-redact-pii-data-using-aws-glue-before-loading-into-amazon-opensearch-service/

Many organizations, small and large, are working to migrate and modernize their analytics workloads on Amazon Web Services (AWS). There are many reasons for customers to migrate to AWS, but one of the main reasons is the ability to use fully managed services rather than spending time maintaining infrastructure, patching, monitoring, backups, and more. Leadership and development teams can spend more time optimizing current solutions and even experimenting with new use cases, rather than maintaining the current infrastructure.

With the ability to move fast on AWS, you also need to be responsible with the data you’re receiving and processing as you continue to scale. These responsibilities include being compliant with data privacy laws and regulations and not storing or exposing sensitive data like personally identifiable information (PII) or protected health information (PHI) from upstream sources.

In this post, we walk through a high-level architecture and a specific use case that demonstrates how you can continue to scale your organization’s data platform without needing to spend large amounts of development time to address data privacy concerns. We use AWS Glue to detect, mask, and redact PII data before loading it into Amazon OpenSearch Service.

Solution overview

The following diagram illustrates the high-level solution architecture. We have defined all layers and components of our design in line with the AWS Well-Architected Framework Data Analytics Lens.


The architecture is comprised of a number of components:

Source data

Data may be coming from many tens to hundreds of sources, including databases, file transfers, logs, software as a service (SaaS) applications, and more. Organizations may not always have control over what data comes through these channels and into their downstream storage and applications.

Ingestion: Data lake batch, micro-batch, and streaming

Many organizations land their source data into their data lake in various ways, including batch, micro-batch, and streaming jobs. For example, Amazon EMR, AWS Glue, and AWS Database Migration Service (AWS DMS) can all be used to perform batch and or streaming operations that sink to a data lake on Amazon Simple Storage Service (Amazon S3). Amazon AppFlow can be used to transfer data from different SaaS applications to a data lake. AWS DataSync and AWS Transfer Family can help with moving files to and from a data lake over a number of different protocols. Amazon Kinesis and Amazon MSK also have capabilities to stream data directly to a data lake on Amazon S3.

S3 data lake

Using Amazon S3 for your data lake is in line with the modern data strategy. It provides low-cost storage without sacrificing performance, reliability, or availability. With this approach, you can bring compute to your data as needed and only pay for capacity it needs to run.

In this architecture, raw data can come from a variety of sources (internal and external), which may contain sensitive data.

Using AWS Glue crawlers, we can discover and catalog the data, which will build the table schemas for us, and ultimately make it straightforward to use AWS Glue ETL with the PII transform to detect and mask or and redact any sensitive data that may have landed in the data lake.

Business context and datasets

To demonstrate the value of our approach, let’s imagine you’re part of a data engineering team for a financial services organization. Your requirements are to detect and mask sensitive data as it is ingested into your organization’s cloud environment. The data will be consumed by downstream analytical processes. In the future, your users will be able to safely search historical payment transactions based on data streams collected from internal banking systems. Search results from operation teams, customers, and interfacing applications must be masked in sensitive fields.

The following table shows the data structure used for the solution. For clarity, we have mapped raw to curated column names. You’ll notice that multiple fields within this schema are considered sensitive data, such as first name, last name, Social Security number (SSN), address, credit card number, phone number, email, and IPv4 address.

Raw Column Name Curated Column Name Type
c0 first_name string
c1 last_name string
c2 ssn string
c3 address string
c4 postcode string
c5 country string
c6 purchase_site string
c7 credit_card_number string
c8 credit_card_provider string
c9 currency string
c10 purchase_value integer
c11 transaction_date date
c12 phone_number string
c13 email string
c14 ipv4 string

Use case: PII batch detection before loading to OpenSearch Service

Customers who implement the following architecture have built their data lake on Amazon S3 to run different types of analytics at scale. This solution is suitable for customers who don’t require real-time ingestion to OpenSearch Service and plan to use data integration tools that run on a schedule or are triggered through events.


Before data records land on Amazon S3, we implement an ingestion layer to bring all data streams reliably and securely to the data lake. Kinesis Data Streams is deployed as an ingestion layer for accelerated intake of structured and semi-structured data streams. Examples of these are relational database changes, applications, system logs, or clickstreams. For change data capture (CDC) use cases, you can use Kinesis Data Streams as a target for AWS DMS. Applications or systems generating streams containing sensitive data are sent to the Kinesis data stream via one of the three supported methods: the Amazon Kinesis Agent, the AWS SDK for Java, or the Kinesis Producer Library. As a last step, Amazon Kinesis Data Firehose helps us reliably load near-real-time batches of data into our S3 data lake destination.

The following screenshot shows how data flows through Kinesis Data Streams via the Data Viewer and retrieves sample data that lands on the raw S3 prefix. For this architecture, we followed the data lifecycle for S3 prefixes as recommended in Data lake foundation.

kinesis raw data

As you can see from the details of the first record in the following screenshot, the JSON payload follows the same schema as in the previous section. You can see the unredacted data flowing into the Kinesis data stream, which will be obfuscated later in subsequent stages.


After the data is collected and ingested into Kinesis Data Streams and delivered to the S3 bucket using Kinesis Data Firehose, the processing layer of the architecture takes over. We use the AWS Glue PII transform to automate detection and masking of sensitive data in our pipeline. As shown in the following workflow diagram, we took a no-code, visual ETL approach to implement our transformation job in AWS Glue Studio.

glue studio nodes

First, we access the source Data Catalog table raw from the pii_data_db database. The table has the schema structure presented in the previous section. To keep track of the raw processed data, we used job bookmarks.

glue catalog

We use the AWS Glue DataBrew recipes in the AWS Glue Studio visual ETL job to transform two date attributes to be compatible with OpenSearch expected formats. This allows us to have a full no-code experience.

We use the Detect PII action to identify sensitive columns. We let AWS Glue determine this based on selected patterns, detection threshold, and sample portion of rows from the dataset. In our example, we used patterns that apply specifically to the United States (such as SSNs) and may not detect sensitive data from other countries. You may look for available categories and locations applicable to your use case or use regular expressions (regex) in AWS Glue to create detection entities for sensitive data from other countries.

It’s important to select the correct sampling method that AWS Glue offers. In this example, it’s known that the data coming in from the stream has sensitive data in every row, so it’s not necessary to sample 100% of the rows in the dataset. If you have a requirement where no sensitive data is allowed to downstream sources, consider sampling 100% of the data for the patterns you chose, or scan the entire dataset and act on each individual cell to ensure all sensitive data is detected. The benefit you get from sampling is reduced costs because you don’t have to scan as much data.

PII Options

The Detect PII action allows you to select a default string when masking sensitive data. In our example, we use the string **********.


We use the apply mapping operation to rename and remove unnecessary columns such as ingestion_year, ingestion_month, and ingestion_day. This step also allows us to change the data type of one of the columns (purchase_value) from string to integer.


From this point on, the job splits into two output destinations: OpenSearch Service and Amazon S3.

Our provisioned OpenSearch Service cluster is connected via the OpenSearch built-in connector for Glue. We specify the OpenSearch Index we’d like to write to and the connector handles the credentials, domain and port. In the screen shot below, we write to the specified index index_os_pii.

opensearch config

We store the masked dataset in the curated S3 prefix. There, we have data normalized to a specific use case and safe consumption by data scientists or for ad hoc reporting needs.

opensearch target s3 folder

For unified governance, access control, and audit trails of all datasets and Data Catalog tables, you can use AWS Lake Formation. This helps you restrict access to the AWS Glue Data Catalog tables and underlying data to only those users and roles who have been granted necessary permissions to do so.

After the batch job runs successfully, you can use OpenSearch Service to run search queries or reports. As shown in the following screenshot, the pipeline masked sensitive fields automatically with no code development efforts.

You can identify trends from the operational data, such as the amount of transactions per day filtered by credit card provider, as shown in the preceding screenshot. You can also determine the locations and domains where users make purchases. The transaction_date attribute helps us see these trends over time. The following screenshot shows a record with all of the transaction’s information redacted appropriately.

json masked

For alternate methods on how to load data into Amazon OpenSearch, refer to Loading streaming data into Amazon OpenSearch Service.

Furthermore, sensitive data can also be discovered and masked using other AWS solutions. For example, you could use Amazon Macie to detect sensitive data inside an S3 bucket, and then use Amazon Comprehend to redact the sensitive data that was detected. For more information, refer to Common techniques to detect PHI and PII data using AWS Services.


This post discussed the importance of handling sensitive data within your environment and various methods and architectures to remain compliant while also allowing your organization to scale quickly. You should now have a good understanding of how to detect, mask, or redact and load your data into Amazon OpenSearch Service.

About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Daniel Rozo is a Senior Solutions Architect with AWS supporting customers in the Netherlands. His passion is engineering simple data and analytics solutions and helping customers move to modern data architectures. Outside of work, he enjoys playing tennis and biking.

How to customize access tokens in Amazon Cognito user pools

Post Syndicated from Edward Sun original https://aws.amazon.com/blogs/security/how-to-customize-access-tokens-in-amazon-cognito-user-pools/

With Amazon Cognito, you can implement customer identity and access management (CIAM) into your web and mobile applications. You can add user authentication and access control to your applications in minutes.

In this post, I introduce you to the new access token customization feature for Amazon Cognito user pools and show you how to use it. Access token customization is included in the advanced security features (ASF) of Amazon Cognito. Note that ASF is subject to additional pricing as described on the Amazon Cognito pricing page.

What is access token customization?

When a user signs in to your app, Amazon Cognito verifies their sign-in information, and if the user is authenticated successfully, returns the ID, access, and refresh tokens. The access token, which uses the JSON Web Token (JWT) format following the RFC7519 standard, contains claims in the token payload that identify the principal being authenticated, and session attributes such as authentication time and token expiration time. More importantly, the access token also contains authorization attributes in the form of user group memberships and OAuth scopes. Your applications or API resource servers can evaluate the token claims to authorize specific actions on behalf of users.

With access token customization, you can add application-specific claims to the standard access token and then make fine-grained authorization decisions to provide a differentiated end-user experience. You can refine the original scope claims to further restrict access to your resources and enforce the least privileged access. You can also enrich access tokens with claims from other sources, such as user subscription information stored in an Amazon DynamoDB table. Your application can use this enriched claim to determine the level of access and content available to the user. This reduces the need to build a custom solution to look up attributes in your application’s code, thereby reducing application complexity, improving performance, and smoothing the integration experience with downstream applications.

How do I use the access token customization feature?

Amazon Cognito works with AWS Lambda functions to modify your user pool’s authentication behavior and end-user experience. In this section, you’ll learn how to configure a pre token generation Lambda trigger function and invoke it during the Amazon Cognito authentication process. I’ll also show you an example function to help you write your own Lambda function.

Lambda trigger flow

During a user authentication, you can choose to have Amazon Cognito invoke a pre token generation trigger to enrich and customize your tokens.

Figure 1: Pre token generation trigger flow

Figure 1: Pre token generation trigger flow

Figure 1 illustrates the pre token generation trigger flow. This flow has the following steps:

  1. An end user signs in to your app and authenticates with an Amazon Cognito user pool.
  2. After the user completes the authentication, Amazon Cognito invokes the pre token generation Lambda trigger, and sends event data to your Lambda function, such as userAttributes and scopes, in a pre token generation trigger event.
  3. Your Lambda function code processes token enrichment logic, and returns a response event to Amazon Cognito to indicate the claims that you want to add or suppress.
  4. Amazon Cognito vends a customized JWT to your application.

The pre token generation trigger flow supports OAuth 2.0 grant types, such as the authorization code grant flow and implicit grant flow, and also supports user authentication through the AWS SDK.

Enable access token customization

Your Amazon Cognito user pool delivers two different versions of the pre token generation trigger event to your Lambda function. Trigger event version 1 includes userAttributes, groupConfiguration, and clientMetadata in the event request, which you can use to customize ID token claims. Trigger event version 2 adds scope in the event request, which you can use to customize scopes in the access token in addition to customizing other claims.

In this section, I’ll show you how to update your user pool to trigger event version 2 and enable access token customization.

To enable access token customization

  1. Open the Cognito user pool console, and then choose User pools.
  2. Choose the target user pool for token customization.
  3. On the User pool properties tab, in the Lambda triggers section, choose Add Lambda trigger.
  4. Figure 2: Add Lambda trigger

    Figure 2: Add Lambda trigger

  5. In the Lambda triggers section, do the following:
    1. For Trigger type, select Authentication.
    2. For Authentication, select Pre token generation trigger.
    3. For Trigger event version, select Basic features + access token customization – Recommended. If this option isn’t available to you, make sure that you have enabled advanced security features. You must have advanced security features enabled to access this option.
  6. Figure 3: Select Lambda trigger

    Figure 3: Select Lambda trigger

  7. Select your Lambda function and assign it as the pre token generation trigger. Then choose Add Lambda trigger.
  8. Figure 4: Add Lambda trigger

    Figure 4: Add Lambda trigger

Example pre token generation trigger

Now that you have enabled access token customization, I’ll walk you through a code example of the pre token generation Lambda trigger, and the version 2 trigger event. This code example examines the trigger event request, and adds a new custom claim and a custom OAuth scope in the response for Amazon Cognito to customize the access token to suit various authorization scheme.

Here is an example version 2 trigger event. The event request contains the user attributes from the Amazon Cognito user pool, the original scope claims, and the original group configurations. It has two custom attributes—membership and location—which are collected during the user registration process and stored in the Cognito user pool.

  "version": "2",
  "triggerSource": "TokenGeneration_HostedAuth",
  "region": "us-east-1",
  "userPoolId": "us-east-1_01EXAMPLE",
  "userName": "mytestuser",
  "callerContext": {
    "awsSdkVersion": "aws-sdk-unknown-unknown",
    "clientId": "1example23456789"
  "request": {
    "userAttributes": {
      "sub": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111",
      "cognito:user_status": "CONFIRMED",
      "email": "[email protected]",
      "email_verified": "true",
      "custom:membership": "Premium",
      "custom:location": "USA"
    "groupConfiguration": {
      "groupsToOverride": [],
      "iamRolesToOverride": [],
      "preferredRole": null
    "scopes": [
  "response": {
    "claimsAndScopeOverrideDetails": null

In the following code example, I transformed the user’s location attribute and membership attribute to add a custom claim and a custom scope. I used the claimsToAddOrOverride field to create a new custom claim called demo:membershipLevel with a membership value of Premium from the event request. I also constructed a new scope with the value of membership:USA.Premium through the scopesToAdd claim, and added the new claim and scope in the event response.

export const handler = function(event, context) {
  // Retrieve user attribute from event request
  const userAttributes = event.request.userAttributes;
  // Add scope to event response
  event.response = {
    "claimsAndScopeOverrideDetails": {
      "idTokenGeneration": {},
      "accessTokenGeneration": {
        "claimsToAddOrOverride": {
          "demo:membershipLevel": userAttributes['custom:membership']
        "scopesToAdd": ["membership:" + userAttributes['custom:location'] + "." + userAttributes['custom:membership']]
  // Return to Amazon Cognito
  context.done(null, event);

With the preceding code, the Lambda trigger sends the following response back to Amazon Cognito to indicate the customization that was needed for the access tokens.

"response": {
  "claimsAndScopeOverrideDetails": {
    "idTokenGeneration": {},
    "accessTokenGeneration": {
      "claimsToAddOrOverride": {
        "demo:membershipLevel": "Premium"
      "scopesToAdd": [

Then Amazon Cognito issues tokens with these customizations at runtime:

  "sub": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111",
  "iss": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1_01EXAMPLE",
  "version": 2,
  "client_id": "1example23456789",
  "event_id": "01faa385-562d-4730-8c3b-458e5c8f537b",
  "token_use": "access",
  "demo:membershipLevel": "Premium",
  "scope": "openid profile email membership:USA.Premium",
  "auth_time": 1702270800,
  "exp": 1702271100,
  "iat": 1702270800,
  "jti": "d903dcdf-8c73-45e3-bf44-51bf7c395e06",
  "username": "mytestuser"

Your application can then use the newly-minted, custom scope and claim to authorize users and provide them with a personalized experience.

Considerations and best practices

There are four general considerations and best practices that you can follow:

  1. Some claims and scopes aren’t customizable. For example, you can’t customize claims such as auth_time, iss, and sub, or scopes such as aws.cognito.signin.user.admin. For the full list of excluded claims and scopes, see the Excluded claims and scopes.
  2. Work backwards from authorization. When you customize access tokens, you should start with your existing authorization schema and then decide whether to customize the scopes or claims, or both. Standard OAuth based authorization scenarios, such as Amazon API Gateway authorizers, typically use custom scopes to provide access. However, if you have complex or fine-grained authorization requirements, then you should consider using both scopes and custom claims to pass additional contextual data to the application or to a policy-based access control service such as Amazon Verified Permission.
  3. Establish governance in token customization. You should have a consistent company engineering policy to provide nomenclature guidance for scopes and claims. A syntax standard promotes globally unique variables and avoids a name collision across different application teams. For example, Application X at AnyCompany can choose to name their scope as ac.appx.claim_name, where ac represents AnyCompany as a global identifier and appx.claim_name represents Application X’s custom claim.
  4. Be aware of limits. Because tokens are passed through various networks and systems, you need to be aware of potential token size limitations in your systems. You should keep scope and claim names as short as possible, while still being descriptive.


In this post, you learned how to integrate a pre token generation Lambda trigger with your Amazon Cognito user pool to customize access tokens. You can use the access token customization feature to provide differentiated services to your end users based on claims and OAuth scopes. For more information, see pre token generation Lambda trigger in the Amazon Cognito Developer Guide.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Edward Sun

Edward Sun

Edward is a Security Specialist Solutions Architect focused on identity and access management. He loves helping customers throughout their cloud transformation journey with architecture design, security best practices, migration, and cost optimizations. Outside of work, Edward enjoys hiking, golfing, and cheering for his alma mater, the Georgia Bulldogs.

Enable metric-based and scheduled scaling for Amazon Managed Service for Apache Flink

Post Syndicated from Francisco Morillo original https://aws.amazon.com/blogs/big-data/enable-metric-based-and-scheduled-scaling-for-amazon-managed-service-for-apache-flink/

Thousands of developers use Apache Flink to build streaming applications to transform and analyze data in real time. Apache Flink is an open source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications. Monitoring and scaling your applications is critical to keep your applications running successfully in a production environment.

Amazon Managed Service for Apache Flink is a fully managed service that reduces the complexity of building and managing Apache Flink applications. Amazon Managed Service for Apache Flink manages the underlying Apache Flink components that provide durable application state, metrics, logs, and more.

In this post, we show a simplified way to automatically scale up and down the number of KPUs (Kinesis Processing Units; 1 KPU is 1 vCPU and 4 GB of memory) of your Apache Flink applications with Amazon Managed Service for Apache Flink. We show you how to scale by using metrics such as CPU, memory, backpressure, or any custom metric of your choice. Additionally, we show how to perform scheduled scaling, allowing you to adjust your application’s capacity at specific times, particularly when dealing with predictable workloads. We also share an AWS CloudFormation utility to help you implement auto scaling quickly with your Amazon Managed Service for Apache Flink applications.

Metric-based scaling

This section describes how to implement a scaling solution for Amazon Managed Service for Apache Flink based on Amazon CloudWatch metrics. Amazon Managed Service for Apache Flink comes with an auto scaling option out of the box that scales out when container CPU utilization is above 75% for 15 minutes. This works well for many use cases; however, for some applications, you may need to scale based on a different metric, or trigger the scaling action at a certain point in time or by a different factor. You can customize your scaling policies and save costs by right-sizing your Amazon Managed Apache Flink applications the deploying this solution.

To perform metric-based scaling, we use CloudWatch alarms, Amazon EventBridge, AWS Step Functions, and AWS Lambda. You can choose from metrics coming from the source such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK), or metrics from the Amazon Managed Service for Apache Flink application. You can find these components in the CloudFormation template in the GitHub repo.

The following diagram shows how to scale an Amazon Managed Service for Apache Flink application in response to a CloudWatch alarm.

This solution uses the metric selected and creates two CloudWatch alarms that, depending on the threshold you use, trigger a rule in EventBridge to start running a Step Functions state machine. The following diagram illustrates the state machine workflow.

Note: Amazon Kinesis Data Analytics was renamed to Amazon Managed Service for Apache Flink August 2023

The Step Functions workflow consists of the following steps:

  1. The state machine describes the Amazon Managed Service for Apache Flink application, which will provide information related to the current number of KPUs in the application, as well if the application is being updated or is it running.
  2. The state machine invokes a Lambda function that, depending on which alarm was triggered, will scale the application up or down, following the parameters set in the CloudFormation template. When scaling the application, it will use the increase factor (either add/subtract or multiple/divide based on that factor) defined in the CloudFormation template. You can have different factors for scaling in or out. If you want to take a more cautious approach to scaling, you can use add/subtract and use an increase factor for scaling in/out of 1.
  3. If the application has reached the maximum or minimum number of KPUs set in the parameters of the CloudFormation template, the workflow stops. Keep in mind that Amazon Managed Service for Apache Flink applications have a default maximum of 64 KPUs (you can request to increase this limit). Do not specify a maximum value above 64 KPUs if you have not requested to increase the quota, because the scaling solution will get stuck by failing to update.
  4. If the workflow continues, because the allocated KPUs haven’t reached the maximum or minimum values, the workflow will wait for a period of time you specify, and then describe the application and see if it has finished updating.
  5. The workflow will continue to wait until the application has finished updating. When the application is updated, the workflow will wait for a period of time you specify in the CloudFormation template, to allow the metric to fall within the threshold and have the CloudWatch rule change from ALARM state to OK.
  6. If the metric is still in ALARM state, the workflow will start again and continue to scale the application either up or down. If the metric is in OK state, the workflow will stop.

For applications that read from a Kinesis Data Streams source, you can use the metric millisBehindLatest. If using a Kafka source, you can use records lag max for scaling events. These metrics capture how far behind your application is from the head of the stream. You can also use a custom metric that you have registered in your Apache Flink applications.

The sample CloudFormation template allows you to select one of the following metrics:

  • Amazon Managed Service for Apache Flink application metrics – Requires an application name:
    • ContainerCPUUtilization – Overall percentage of CPU utilization across task manager containers in the Flink application cluster.
    • ContainerMemoryUtilization – Overall percentage of memory utilization across task manager containers in the Flink application cluster.
    • BusyTimeMsPerSecond – Time in milliseconds the application is busy (neither idle nor back pressured) per second.
    • BackPressuredTimeMsPerSecond – Time in milliseconds the application is back pressured per second.
    • LastCheckpointDuration – Time in milliseconds it took to complete the last checkpoint.
  • Kinesis Data Streams metrics – Requires the data stream name:
    • MillisBehindLatest – The number of milliseconds the consumer is behind the head of the stream, indicating how far behind the current time the consumer is.
    • IncomingRecords – The number of records successfully put to the Kinesis data stream over the specified time period. If no records are coming, this metric will be null and you won’t be able to scale down.
  • Amazon MSK metrics – Requires the cluster name, topic name, and consumer group name):
    • MaxOffsetLag – The maximum offset lag across all partitions in a topic.
    • SumOffsetLag – The aggregated offset lag for all the partitions in a topic.
    • EstimatedMaxTimeLag – The time estimate (in seconds) to drain MaxOffsetLag.
  • Custom metrics – Metrics you can define as part of your Apache Flink applications. Most common metrics are counters (continuously increase) or gauges (can be updated with last value). For this solution, you need to add the kinesisAnalytics dimension to the metric group. You also need to provide the custom metric name as a parameter in the CloudFormation template. If you need to use more dimensions in your custom metric, you need to modify the CloudWatch alarm so it’s able to use your specific metric. For more information on custom metrics, see Using Custom Metrics with Amazon Managed Service for Apache Flink.

The CloudFormation template deploys the resources as well as the auto scaling code. You only need to specify the name of the Amazon Managed Service for Apache Flink application, the metric to which you want to scale your application in or out, and the thresholds for triggering an alarm. The solution by default will use the average aggregation for metrics and a period duration of 60 seconds for each data point. You can configure the evaluation periods and data points to alarm when defining the CloudFormation template.

Scheduled scaling

This section describes how to implement a scaling solution for Amazon Managed Service for Apache Flink based on a schedule. To perform scheduled scaling, we use EventBridge and Lambda, as illustrated in the following figure.

These components are available in the CloudFormation template in the GitHub repo.

The EventBridge scheduler is triggered based on the parameters set when deploying the CloudFormation template. You define the KPU of the applications when running at peak times, as well as the KPU for non-peak times. The application runs with those KPU parameters depending on the time of day.

As with the previous example for metric-based scaling, the CloudFormation template deploys the resources and scaling code required. You only need to specify the name of the Amazon Managed Service for Apache Flink application and the schedule for the scaler to modify the application to the set number of KPUs.

Considerations for scaling Flink applications using metric-based or scheduled scaling

Be aware of the following when considering these solutions:

  • When scaling Amazon Managed Service for Apache Flink applications in or out, you can choose to either increase the overall application parallelism or modify the parallelism per KPU. The latter allows you to set the number of parallel tasks that can be scheduled per KPU. This sample only updates the overall parallelism, not the parallelism per KPU.
  • If SnapshotsEnabled is set to true in ApplicationSnapshotConfiguration, Amazon Managed Service for Apache Flink will automatically pause the application, take a snapshot, and then restore the application with the updated configuration whenever it is updated or scaled. This process may result in downtime for the application, depending on the state size, but there will be no data loss. When using metric-based scaling, you have to choose a minimum and a maximum threshold of KPU the application can have. Depending on by how much you perform the scaling, if the new desired KPU is bigger or lower than your thresholds, the solution will update the KPUs to be equal to your thresholds.
  • When using metric-based scaling, you also have to choose a cooling down period. This is the amount of time you want your application to wait after being updated, to see if the metric has gone from ALARM status to OK status. This value depends on how long are you willing to wait before another scaling event to occur.
  • With the metric-based scaling solution, you are limited to choosing the metrics that are listed in the CloudFormation template. However, you can modify the alarms to use any available metric in CloudWatch.
  • If your application is required to run without interruptions for periods of time, we recommend using scheduled scaling, to limit scaling to non-critical times.


In this post, we covered how you can enable custom scaling for Amazon Managed Service for Apache Flink applications using enhanced monitoring features from CloudWatch integrated with Step Functions and Lambda. We also showed how you can configure a schedule to scale an application using EventBridge. Both of these samples and many more can be found in the GitHub repo.

About the Authors

Deepthi Mohan is a Principal PMT on the Amazon Managed Service for Apache Flink team.

Francisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink.

Achieve high availability in Amazon OpenSearch Multi-AZ with Standby enabled domains: A deep dive into failovers

Post Syndicated from Anshu Agarwal original https://aws.amazon.com/blogs/big-data/achieve-high-availability-in-amazon-opensearch-multi-az-with-standby-enabled-domains-a-deep-dive-into-failovers/

Amazon OpenSearch Service recently introduced Multi-AZ with Standby, a deployment option designed to provide businesses with enhanced availability and consistent performance for critical workloads. With this feature, managed clusters can achieve 99.99% availability while remaining resilient to zonal infrastructure failures.

In this post, we explore how search and indexing works with Multi-AZ with Standby and delve into the underlying mechanisms that contribute to its reliability, simplicity, and fault tolerance.


Multi-AZ with Standby deploys OpenSearch Service domain instances across three Availability Zones, with two zones designated as active and one as standby. This configuration ensures consistent performance, even in the event of zonal failures, by maintaining the same capacity across all zones. Importantly, this standby zone follows a statically stable design, eliminating the need for capacity provisioning or data movement during failures.

During regular operations, the active zone handles coordinator traffic for both read and write requests, as well as shard query traffic. The standby zone, on the other hand, only receives replication traffic. OpenSearch Service utilizes a synchronous replication protocol for write requests. This enables the service to promptly promote a standby zone to active status in the event of a failure (mean time to failover <= 1 minute), known as a zonal failover. The previously active zone is then demoted to standby mode, and recovery operations commence to restore its healthy state.

Search traffic routing and failover to guarantee high availability

In an OpenSearch Service domain, a coordinator is any node that handles HTTP(S) requests, especially indexing and search requests. In a Multi-AZ with Standby domain, the data nodes in the active zone act as coordinators for search requests.

During the query phase of a search request, the coordinator determines the shards to be queried and sends a request to the data node hosting the shard copy. The query is run locally on each shard and matched documents are returned to the coordinator node. The coordinator node, which is responsible for sending the request to nodes containing shard copies, runs the process in two steps. First, it creates an iterator that defines the order in which nodes need to be queried for a shard copy so that traffic is uniformly distributed across shard copies. Subsequently, the request is sent to the relevant nodes.

In order to create an ordered list of nodes to be queried for a shard copy, the coordinator node uses various algorithms. These algorithms include round-robin selection, adaptive replica selection, preference-based shard routing, and weighted round-robin.

For Multi-AZ with Standby, the weighted round-robin algorithm is used for shard copy selection. In this approach, active zones are assigned a weight of 1, and the standby zone is assigned a weight of 0. This ensures that no read traffic is sent to data nodes in the standby Availability Zone.

The weights are stored in cluster state metadata as a JSON object:

"weighted_shard_routing": {
    "awareness": {
        "zone": {
            "us-east-1b": 0,
            "us-east-1d": 1,
            "us-east-1c": 1
     "_version": 3

As shown in the following screenshot, the us-east-1b Region has its zone status as StandBy, indicating that the data nodes in this Availability Zone are in standby state and don’t receive search or indexing requests from the load balancer.

Availability Zone status in AWS Console

To maintain steady-state operations, the standby Availability Zone is rotated every 30 minutes, ensuring all network parts are covered across Availability Zones. This proactive approach verifies the availability of read paths, further enhancing the system’s resilience during potential failures. The following diagram illustrates this architecture.

Steady State Operation

In the preceding diagram, Zone-C has a weighted round-robin weight set to zero. This ensures that the data nodes in the standby zone don’t receive any indexing or search traffic. When the coordinator queries data nodes for shard copies, it uses a weighted round-robin weight to decide on the order in which nodes to be queried. Because the weight is zero for the standby Availability Zone, coordinator requests are not sent.

In an OpenSearch Service cluster, the active and standby zones can be checked at any time using Availability Zone rotation metrics, as shown in the following screenshot.

Availability Zone rotation metrics

During zonal outages, the standby Availability Zone seamlessly switches to fail-open mode for search requests. This means that the shard query traffic is routed to all Availability Zones, even those in standby, when a healthy shard copy is unavailable in the active Availability Zone. This fail-open approach safeguards search requests from disruption during failures, ensuring continuous service. The following diagram illustrates this architecture.

Read Failover during Zonal Failure

In the preceding diagram, during the steady state, the shard query traffic is sent to the data node in the active Availability Zones (Zone-A and Zone-B). Due to node failures in Zone-A, the standby Availability Zone (Zone-C) fails open to take shard query traffic so that there isn’t any impact to the search requests. Eventually, Zone-A is detected as unhealthy and the read failover switches the standby to Zone-A.

How failover ensures high availability during write impairment

The OpenSearch Service replication model follows a primary backup model, characterized by its synchronous nature, where acknowledgement from all shard copies is necessary before a write request can be acknowledged to the user. One notable drawback of this replication model is its susceptibility to slowdowns in the event of any impairment in the write path. These systems rely on an active leader node to identify failures or delays and then broadcast this information to all nodes. The duration it takes to detect these issues (mean time to detect) and subsequently resolve them (mean time to repair) largely determines how long the system will operate in an impaired state. Additionally, any networking event that affects inter-zone communications can significantly impede write requests due to the synchronous nature of replication.

OpenSearch Service utilizes an internal node-to-node communication protocol for replicating write traffic and coordinating metadata updates through an elected leader. Consequently, putting the zone experiencing stress in standby wouldn’t effectively address the issue of write impairment.

Zonal write failover: Cutting off inter-zone replication traffic

For Multi-AZ with Standby, to mitigate potential performance issues caused during unforeseen events like zonal failures and networking events, zonal write failover is an effective approach. This approach involves graceful removal of nodes in the impacted zone from the cluster, effectively cutting off ingress and egress traffic between zones. By severing the inter-zone replication traffic, the impact of zonal failures can be contained within the affected zone. This provides a more predictable experience for customers and ensures that the system continues to operate reliably.

Graceful write failover

The orchestration of a write failover within OpenSearch Service is carried out by the elected leader node through a well-defined mechanism. This mechanism involves a consensus protocol for cluster state publication, ensuring unanimous agreement among all nodes to designate a single zone (at all times) for decommissioning. Importantly, metadata related to the affected zone is replicated across all nodes to ensure its persistence, even during a full restart in the event of an outage.

Furthermore, the leader node ensures a smooth and graceful transition by initially placing the nodes in the impacted zones on standby for a duration of 5 minutes before initiating I/O fencing. This deliberate approach prevents any new coordinator traffic or shard query traffic from being directed to the nodes within the impacted zone. This, in turn, allow these nodes to complete their ongoing tasks gracefully and gradually handle any inflight requests before being taken out of service. The following diagram illustrates this architecture.

Write Failover during Networking Event

In the process of implementing a write failover for a leader node, OpenSearch Service follows these key steps:

  • Leader abdication – If the leader node happens to be located in a zone scheduled for write failover, the system ensures that the leader node voluntarily steps down from its leadership role. This abdication is carried out in a controlled manner, and the entire process is handed over to another eligible node, which then takes charge of the actions required.
  • Prevent reelection of to-be-decommissioned leader – To prevent the reelection of a leader from a zone marked for write failover, when the eligible leader node initiates the write failover action, it takes measures to ensure that any to-be-decommissioned leader nodes do not participate in any further elections. This is achieved by excluding the to-be-decommissioned leader node from the voting configuration, effectively preventing it from voting during any critical phase of the cluster’s operation.

Metadata related to the write failover zone is stored within the cluster state, and this information is published to all nodes in the distributed OpenSearch Service cluster as follows:

"decommissionedAttribute": {
    "awareness": {
        "zone": "us-east-1c"
     "status": "successful",
     "requestID": "FLoyf5v9RVSsaAquRNKxIw"

The following screenshot depicts that during a networking slowdown in a zone, write failover helps recover availability.

Write Failover helps recovering availability

Zonal recovery after write failover

The process of zonal recommissioning plays a crucial role in the recovery phase following a zonal write failover. After the impacted zone has been restored and is considered stable, the nodes that were previously decommissioned will rejoin the cluster. This recommissioning typically occurs within a time frame of 2 minutes after the zone has been recommissioned.

This enables them to synchronize with their peer nodes and initiates the recovery process for replica shards, effectively restoring the cluster to its desired state.


The introduction of OpenSearch Service Multi-AZ with Standby provides businesses with a powerful solution to achieve high availability and consistent performance for critical workloads. With this deployment option, businesses can enhance their infrastructure’s resilience, simplify cluster configuration and management, and enforce best practices. With features like weighted round-robin shard copy selection, proactive failover mechanisms, and fail-open standby Availability Zones, OpenSearch Service Multi-AZ with Standby ensures a reliable and efficient search experience for demanding enterprise environments.

For more information about Multi-AZ with Standby, refer to Amazon OpenSearch Service Under the Hood: Multi-AZ with Standby.

About the Author

Anshu Agarwal
 is a Senior Software Engineer working on AWS OpenSearch at Amazon Web Services. She is passionate about solving problems related to building scalable and highly reliable systems.

Rishab Nahata
 is a Software Engineer working on OpenSearch at Amazon Web Services. He is fascinated about solving problems in distributed systems. He is active contributor to OpenSearch.

Bukhtawar Khan
is a Principal Engineer working on Amazon OpenSearch Service. He is interested in distributed and autonomous systems. He is an active contributor to OpenSearch.

Ranjith Ramachandra
is an Engineering Manager working on Amazon OpenSearch Service at Amazon Web Services.

Automate Cedar policy validation with AWS developer tools

Post Syndicated from Pontus Palmenäs original https://aws.amazon.com/blogs/security/automate-cedar-policy-validation-with-aws-developer-tools/

Cedar is an open-source language that you can use to authorize policies and make authorization decisions based on those policies. AWS security services including AWS Verified Access and Amazon Verified Permissions use Cedar to define policies. Cedar supports schema declaration for the structure of entity types in those policies and policy validation with that schema.

In this post, we show you how to use developer tools on AWS to implement a build pipeline that validates the Cedar policy files against a schema and runs a suite of tests to isolate the Cedar policy logic. As part of the walkthrough, you will introduce a subtle policy error that impacts permissions to observe how the pipeline tests catch the error. Detecting errors earlier in the development lifecycle is often referred to as shifting left. When you shift security left, you can help prevent undetected security issues during the application build phase.


This post extends a hypothetical photo sharing application from the Cedar policy language in action workshop. By using that app, users organize their photos into albums and share them with groups of users. Figure 1 shows the entities from the photo application.

Figure 1: Photo application entities

Figure 1: Photo application entities

For the purpose of this post, the important requirements are that user JohnDoe has view access to the album JaneVacation, which contains two photos that user JaneDoe owns:

  • Photo sunset.jpg has a contest label (indicating that the role PhotoJudge has view access)
  • Photo nightclub.jpg has a private label (indicating that only the owner has access)

Cedar policies separate application permissions from the code that retrieves and displays photos. The following Cedar policy explicitly permits the principal of user JohnDoe to take the action viewPhoto on resources in the album JaneVacation.

permit (
  principal == PhotoApp::User::"JohnDoe",
  action == PhotoApp::Action::"viewPhoto",
  resource in PhotoApp::Album::"JaneVacation"

The following Cedar policy forbids non-owners from accessing photos labeled as private, even if other policies permit access. In our example, this policy prevents John Doe from viewing the nightclub.jpg photo (denoted by an X in Figure 1).

forbid (
  resource in PhotoApp::Application::"PhotoApp"
when { resource.labels.contains("private") }
unless { resource.owner == principal };

A Cedar authorization request asks the question: Can this principal take this action on this resource in this context? The request also includes attribute and parent information for the entities. If an authorization request is made with the following test data, against the Cedar policies and entity data described earlier, the authorization result should be DENY.

  "principal": "PhotoApp::User::\"JohnDoe\"",
  "action": "PhotoApp::Action::\"viewPhoto\"",
  "resource": "PhotoApp::Photo::\"nightclub.jpg\"",
  "context": {}

The project test suite uses this and other test data to validate the expected behaviors when policies are modified. An error intentionally introduced into the preceding forbid policy lets the first policy satisfy the request and ALLOW access. That unexpected test result compared to the requirements fails the build.

Developer tools on AWS

With AWS developer tools, you can host code and build, test, and deploy applications and infrastructure. AWS CodeCommit hosts the Cedar policies and a test suite, AWS CodeBuild runs the tests, and AWS CodePipeline automatically runs the CodeBuild job when a CodeCommit repository state change event occurs.

In the following steps, you will create a pipeline, commit policies and tests, run a passing build, and observe how a policy error during validation fails a test case.


To follow along with this walkthrough, make sure to complete the following prerequisites:

Set up the local environment

The first step is to set up your local environment.

To set up the local environment

  1. Using Git, clone the GitHub repository for this post:
  2. git clone [email protected]:aws-samples/cedar-policy-validation-pipeline.git

  3. Before you commit this source code to a CodeCommit repository, run the test suite locally; this can help you shorten the feedback loop. To run the test suite locally, choose one of the following options:
  4. Option 1: Install Rust and compile the Cedar CLI binary

    1. Install Rust by using the rustup tool.
    2. curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

    3. Compile the Cedar CLI (version 2.4.2) binary by using cargo.
    4. cargo install [email protected]

    5. Run the cedar_testrunner.sh script, which tests authorize requests by using the Cedar CLI.
    6. cd policystore/tests && ./cedar_testrunner.sh

    Option 2: Run the CodeBuild agent

    1. Locally evaluate the buildspec.yml inside a CodeBuild container image by using the codebuild_build.sh script from aws-codebuild-docker-images with the following parameters:
    2. ./codebuild_build.sh -i public.ecr.aws/codebuild/amazonlinux2-x86_64-standard:5.0 -a .codebuild

Project structure

The policystore directory contains one Cedar policy for each .cedar file. The Cedar schema is defined in the cedarschema.json file. A tests subdirectory contains a cedarentities.json file that represents the application data; its subdirectories (for example, album JaneVacation) represent the test suites. The test suite directories contain individual tests inside their ALLOW and DENY subdirectories, each with one or more JSON files that contain the authorization request that Cedar will evaluate against the policy set. A README file in the tests directory provides a summary of the test cases in the suite.

The cedar_testrunner.sh script runs the Cedar CLI to perform a validate command for each .cedar file against the Cedar schema, outputting either PASS or ERROR. The script also performs an authorize command on each test file, outputting either PASS or FAIL depending on whether the results match the expected authorization decision.

Set up the CodePipeline

In this step, you use AWS CloudFormation to provision the services used in the pipeline.

To set up the pipeline

  1. Navigate to the directory of the cloned repository.

    cd cedar-policy-validation-pipeline

  2. Create a new CloudFormation stack from the template.
    aws cloudformation deploy \
    --template-file template.yml \
    --stack-name cedar-policy-validation \
    --capabilities CAPABILITY_NAMED_IAM

  3. Wait for the message Successfully created/updated stack.

Invoke CodePipeline

The next step is to commit the source code to a CodeCommit repository, and then configure and invoke CodePipeline.

To invoke CodePipeline

  1. Add an additional Git remote named codecommit to the repository that you previously cloned. The following command points the Git remote to the CodeCommit repository that CloudFormation created. The CedarPolicyRepoCloneUrl stack output is the HTTPS clone URL. Replace it with CedarPolicyRepoCloneGRCUrl to use the HTTPS (GRC) clone URL when you connect to CodeCommit with git-remote-codecommit.

    git remote add codecommit $(aws cloudformation describe-stacks --stack-name cedar-policy-validation --query 'Stacks[0].Outputs[?OutputKey==`CedarPolicyRepoCloneUrl`].OutputValue' --output text)

  2. Push the code to the CodeCommit repository. This starts a pipeline run.

    git push codecommit main

  3. Check the progress of the pipeline run.
    aws codepipeline get-pipeline-execution \
    --pipeline-name cedar-policy-validation \
    --pipeline-execution-id $(aws codepipeline list-pipeline-executions --pipeline-name cedar-policy-validation --query 'pipelineExecutionSummaries[0].pipelineExecutionId' --output text) \
    --query 'pipelineExecution.status' --output text

The build installs Rust in CodePipeline in your account and compiles the Cedar CLI. After approximately four minutes, the pipeline run status shows Succeeded.

Refactor some policies

This photo sharing application sample includes overlapping policies to simulate a refactoring workflow, where after changes are made, the test suite continues to pass. The DoePhotos.cedar and JaneVacation.cedar static policies are replaced by the logically equivalent viewPhoto.template.cedar policy template and two template-linked policies defined in cedartemplatelinks.json. After you delete the extra policies, the passing tests illustrate a successful refactor with the same expected application permissions.

To refactor policies

  1. Delete DoePhotos.cedar and JaneVacation.cedar.
  2. Commit the change to the repository.
    git add .
    git commit -m "Refactor some policies"
    git push codecommit main

  3. Check the pipeline progress. After about 20 seconds, the pipeline status shows Succeeded.

The second pipeline build runs quicker because the build specification is configured to cache a version of the Cedar CLI. Note that caching isn’t implemented in the local testing described in Option 2 of the local environment setup.

Break the build

After you confirm that you have a working pipeline that validates the Cedar policies, see what happens when you commit an invalid Cedar policy.

To break the build

  1. Using a text editor, open the file policystore/Photo-labels-private.cedar.
  2. In the when clause, change resource.labels to resource.label (removing the “s”). This policy syntax is valid, but no longer validates against the Cedar schema.
  3. Commit the change to the repository.
    git add .
    git commit -m "Break the build"
    git push codecommit main

  4. Sign in to the AWS Management Console and open the CodePipeline console.
  5. Wait for the Most recent execution field to show Failed.
  6. Select the pipeline and choose View in CodeBuild.
  7. Choose the Reports tab, and then choose the most recent report.
  8. Review the report summary, which shows details such as the total number of Passed and Failed/Error test case totals, and the pass rate, as shown in Figure 2.
  9. Figure 2: CodeBuild test report summary

    Figure 2: CodeBuild test report summary

  10. To get the error details, in the Details section, select the Test case called validate Photo-labels-private.cedar that has a Status of Error.
  11. Figure 3: CodeBuild test report test cases

    Figure 3: CodeBuild test report test cases

    That single policy change resulted in two test cases that didn’t pass. The detailed error message shown in Figure 4 is the output from the Cedar CLI. When the policy was validated against the schema, Cedar found the invalid attribute label on the entity type PhotoApp::Photo. The Failed message of unexpected ALLOW occurred because the label attribute typo prevented the forbid policy from matching and producing a DENY result. Each of these tests helps you avoid deploying invalid policies.

    Figure 4: CodeBuild test case error message

    Figure 4: CodeBuild test case error message

Clean up

To avoid ongoing costs and to clean up the resources that you deployed in your AWS account, complete the following steps:

To clean up the resources

  1. Open the Amazon S3 console, select the bucket that begins with the phrase cedar-policy-validation-codepipelinebucket, and Empty the bucket.
  2. Open the CloudFormation console, select the cedar-policy-validation stack, and then choose Delete.
  3. Open the CodeBuild console, choose Build History, filter by cedar-policy-validation, select all results, and then choose Delete builds.


In this post, you learned how to use AWS developer tools to implement a pipeline that automatically validates and tests when Cedar policies are updated and committed to a source code repository. Using this approach, you can detect invalid policies and potential application permission errors earlier in the development lifecycle and before deployment.

To learn more about the Cedar policy language, see the Cedar Policy Language Reference Guide or browse the source code at the cedar-policy organization on GitHub. For real-time validation of Cedar policies and schemas, install the Cedar policy language for Visual Studio Code extension.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Amazon Verified Permissions re:Post or contact AWS Support.

Pontus Palmenas

Pontus Palmenäs

Pontus is a Startup Solutions Architect based in Stockholm, Sweden, where he is helping customers in healthcare and life sciences and FinTech. He is passionate about all things security. Outside of work, he enjoys making electronic music in his home studio and spending quality time with his family.

Kevin Hakanson

Kevin Hakanson

Kevin is a Senior Solutions Architect for AWS World Wide Public Sector based in Minnesota. He works with EdTech and GovTech customers to ideate, design, validate, and launch products using cloud-native technologies and modern development practices. When not staring at a computer screen, he is probably staring at another screen, either watching TV or playing video games with his family.

Building a generative AI Marketing Portal on AWS

Post Syndicated from Tristan Nguyen original https://aws.amazon.com/blogs/messaging-and-targeting/building-a-generative-ai-marketing-portal-on-aws/


In the preceding entries of this series, we examined the transformative impact of Generative AI on marketing strategies in “Building Generative AI into Marketing Strategies: A Primer” and delved into the intricacies of Prompt Engineering to enhance the creation of marketing content with services such as Amazon Bedrock in “From Prompt Engineering to Auto Prompt Optimisation”. We also explored the potential of Large Language Models (LLMs) to refine prompts for more effective customer engagement.

Continuing this exploration, we will articulate how Amazon Bedrock, Amazon Personalize, and Amazon Pinpoint can be leveraged to construct a marketer portal that not only facilitates AI-driven content generation but also personalizes and distributes this content effectively. The aim is to provide a clear blueprint for deploying a system that crafts, personalizes, and distributes marketing content efficiently. This blog will guide you through the deployment process, underlining the real-world utility of these services in optimizing marketing workflows. Through use cases and a code demonstration, we’ll see these technologies in action, offering a hands-on perspective on enhancing your marketing pipeline with AI-driven solutions.

The Challenge with Content Generation in Marketing

Many companies struggle to streamline their marketing operations effectively, facing hurdles at various stages of the marketing operations pipeline. Below, we list the challenges at three main stages of the pipeline: content generation, content personalization, and content distribution.

Content Generation

Creating high-quality, engaging content is often easier said than done. Companies need to invest in skilled copywriters or content creators who understand not just the product but also the target audience. Even with the right talent, the process can be time-consuming and costly. Moreover, generating content at scale while maintaining quality and compliance to industry regulations is the key blocker for many companies considering adopting generative AI technologies in production environments.

Content Personalization

Once the content is created, the next hurdle is personalization. In today’s digital age, generic content rarely captures attention. Customers expect content tailored to their needs, preferences, and behaviors. However, personalizing content is not straightforward. It requires a deep understanding of customer data, which often resides in siloed databases, making it difficult to create a 360-degree view of the customer.

Content Distribution

Finally, even the most captivating, personalized content is ineffective if it doesn’t reach the right audience at the right time. Companies often grapple with choosing the appropriate channels for content distribution, be it email, social media, or mobile notifications. Additionally, ensuring that the content complies with various regulations and doesn’t end up in spam folders adds another layer of complexity to the distribution phase. Sending at scale requires paying attention to deliverability, security and reliability which often poses significant challenges to marketers.

By addressing these challenges, companies can significantly improve their marketing operations and empower their marketers to be more effective. But how can this be achieved efficiently and at scale? The answer lies in leveraging the power of Amazon Bedrock, Amazon Personalize, and Amazon Pinpoint, as we will explore in the following solution.

The Solution In Action

Before we dive into the details of the implementation, let’s take a look at the end result through the linked demo video.

Use Case 1: Banking/Financial Services Industry

You are a relationship manager working in the Consumer Banking department of a fictitious company called AnyCompany Bank. You are assigned a group of customers and would like to send out personalized and targeted communications to the channel of choice to every members of this group of customer.

Behind the scene, the marketer is utilizing Amazon Pinpoint to create the segment of customers they would like to target. The customers’ information and the marketer’s prompt are then fed into Amazon Bedrock to generate the marketing content, which is then sent to the customer via SMS and email using Amazon Pinpoint.

  • In the Prompt Iterator page, you can employ a process called “prompt engineering” to further optimize your prompt to maximize the effectiveness of your marketing campaigns. Please refer to this blog on the process behind engineering the prompt as well as how to apply an additional LLM model for auto-prompting. To get started, simply copy the sample banking prompt which has gone through the prompt engineering process in this page.
  • Next, you can either upload your customer group by uploading a .csv file (through “Importing a Segment”) or specify a customer group using pre-defined filter criteria based on your current customer database using Amazon Pinpoint.


E.g.: The screenshot shows a sample filtered segment named ManagementOrRetired that only filters to customers who are management or retirees.

  • Once done, you can log into the marketer portal and choose the relevant segment that you’ve just created within the Amazon Pinpoint console.


  • You can then preview the customers and their information stored in your Amazon Pinpoint’s customer database. Once satisfied, we’re ready to start generating content for those customers!
  • Click on 1:1 Content Generator tab, your content is automatically generated for your first customer. Here, you can cycle through your customers one by one, and depending on the customer’s preferred language and channel, an email or SMS in the preferred language is automatically generated for them.
    • Generated SMS in English


    • A negative example showing proper prompt-engineering at work to moderate content. This happens if we try to insert data that does not make sense for the marketing content generator to output. In this case, the marketing generator refuses to output (justifiably) an advertisement for a 6-year-old on a secured instalment loan.


  • Finally, we choose to send the generated content via Amazon Pinpoint by clicking on “Send with Amazon Pinpoint”. In the back end, Amazon Pinpoint will orchestrate the sending of the email/SMS through the appropriate channels.
    • Alternatively, if the auto-generated content still did not meet your needs and you want to generate another draft, you can Disagree and try again.

Use Case 2: Travel & Hospitality

You are a marketing executive that’s working for an online air ticketing agency. You’ve been tasked to promote a specific flight from Singapore to Hong Kong for AnyCompany airline. You’d first like to identify which customers would be prime candidates to promote this flight leg to and then send out hyper-personalized message to them.

Behind the scene, instead of using Amazon Pinpoint to manually define the segment, the marketer in this case is leveraging AIML capabilities of Amazon Personalize to define the best group of customers to recommend the specific flight leg to them. Similar to the above use case, the customers’ information and LLM prompt are fed into the Amazon Bedrock, which generates the marketing content that is eventually sent out via Amazon Pinpoint.

  • Similar to the above use case, you’d need to go through a prompt engineering process to ensure that the content the LLM model is generating will be relevant and safe for use. To get started quickly, go to the Prompt Iterator page, you can use the sample airlines prompt and iterate from there.
  • Your company offers many different flight legs, aggregated from many different carriers. You first filter down to the flight leg that you want to promote using the Filters on the left. In this case, we are filtering for flights originating from Singapore (SRCCity) and going to Hong Kong (DSTCity), operated by AnyCompany Airlines.


  • Now, let’s choose the number of customers that you’d like to generate. Once satisfied, you choose to start the batch segmentation job.
  • In the background, Amazon Personalize generates a group of customers that are most likely to be interested in this flight leg based on past interactions with similar flight itineraries.
  • Once the segmentation job is finished as shown, you can fetch the recommended group of customers and start generating content for them immediately, similar to the first use case.

Setup instructions

The setup instructions and deployment details can be found in the GitHub link.


In this blog, we’ve explored the transformative potential of integrating Amazon Bedrock, Amazon Personalize, and Amazon Pinpoint to address the common challenges in marketing operations. By automating the content generation with Amazon Bedrock, personalizing at scale with Amazon Personalize, and ensuring precise content distribution with Amazon Pinpoint, companies can not only streamline their marketing processes but also elevate the customer experience.

The benefits are clear: time-saving through automation, increased operational efficiency, and enhanced customer satisfaction through personalized engagement. This integrated solution empowers marketers to focus on strategy and creativity, leaving the heavy lifting to AWS’s robust AI and ML services.

For those ready to take the next step, we’ve provided a comprehensive guide and resources to implement this solution. By following the setup instructions and leveraging the provided prompts as a starting point, you can deploy this solution and begin customizing the marketer portal to your business’ needs.

Call to Action

Don’t let the challenges of content generation, personalization, and distribution hold back your marketing potential. Deploy the Generative AI Marketer Portal today, adapt it to your specific needs, and watch as your marketing operations transform. For a hands-on start and to see this solution in action, visit the GitHub repository for detailed setup instructions.

Have a question? Share your experiences or leave your questions in the comment section.

About the Authors

Tristan (Tri) Nguyen

Tristan (Tri) Nguyen

Tristan (Tri) Nguyen is an Amazon Pinpoint and Amazon Simple Email Service Specialist Solutions Architect at AWS. At work, he specializes in technical implementation of communications services in enterprise systems and architecture/solutions design. In his spare time, he enjoys chess, rock climbing, hiking and triathlon.

Philipp Kaindl

Philipp Kaindl

Philipp Kaindl is a Senior Artificial Intelligence and Machine Learning Solutions Architect at AWS. With a background in data science and
mechanical engineering his focus is on empowering customers to create lasting business impact with the help of AI. Outside of work, Philipp enjoys tinkering with 3D printers, sailing and hiking.

Bruno Giorgini

Bruno Giorgini

Bruno Giorgini is a Senior Solutions Architect specializing in Pinpoint and SES. With over two decades of experience in the IT industry, Bruno has been dedicated to assisting customers of all sizes in achieving their objectives. When he is not crafting innovative solutions for clients, Bruno enjoys spending quality time with his wife and son, exploring the scenic hiking trails around the SF Bay Area.

Architectural patterns for real-time analytics using Amazon Kinesis Data Streams, part 1

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/big-data/architectural-patterns-for-real-time-analytics-using-amazon-kinesis-data-streams-part-1/

We’re living in the age of real-time data and insights, driven by low-latency data streaming applications. Today, everyone expects a personalized experience in any application, and organizations are constantly innovating to increase their speed of business operation and decision making. The volume of time-sensitive data produced is increasing rapidly, with different formats of data being introduced across new businesses and customer use cases. Therefore, it is critical for organizations to embrace a low-latency, scalable, and reliable data streaming infrastructure to deliver real-time business applications and better customer experiences.

This is the first post to a blog series that offers common architectural patterns in building real-time data streaming infrastructures using Kinesis Data Streams for a wide range of use cases. It aims to provide a framework to create low-latency streaming applications on the AWS Cloud using Amazon Kinesis Data Streams and AWS purpose-built data analytics services.

In this post, we will review the common architectural patterns of two use cases: Time Series Data Analysis and Event Driven Microservices. In the subsequent post in our series, we will explore the architectural patterns in building streaming pipelines for real-time BI dashboards, contact center agent, ledger data, personalized real-time recommendation, log analytics, IoT data, Change Data Capture, and real-time marketing data. All these architecture patterns are integrated with Amazon Kinesis Data Streams.

Real-time streaming with Kinesis Data Streams

Amazon Kinesis Data Streams is a cloud-native, serverless streaming data service that makes it easy to capture, process, and store real-time data at any scale. With Kinesis Data Streams, you can collect and process hundreds of gigabytes of data per second from hundreds of thousands of sources, allowing you to easily write applications that process information in real-time. The collected data is available in milliseconds to allow real-time analytics use cases, such as real-time dashboards, real-time anomaly detection, and dynamic pricing. By default, the data within the Kinesis Data Stream is stored for 24 hours with an option to increase the data retention to 365 days. If customers want to process the same data in real-time with multiple applications, then they can use the Enhanced Fan-Out (EFO) feature. Prior to this feature, every application consuming data from the stream shared the 2MB/second/shard output. By configuring stream consumers to use enhanced fan-out, each data consumer receives dedicated 2MB/second pipe of read throughput per shard to further reduce the latency in data retrieval.

For high availability and durability, Kinesis Data Streams achieves high durability by synchronously replicating the streamed data across three Availability Zones in an AWS Region and gives you the option to retain data for up to 365 days. For security, Kinesis Data Streams provide server-side encryption so you can meet strict data management requirements by encrypting your data at rest and Amazon Virtual Private Cloud (VPC) interface endpoints to keep traffic between your Amazon VPC and Kinesis Data Streams private.

Kinesis Data Streams has native integrations with other AWS services such as AWS Glue and Amazon EventBridge to build real-time streaming applications on AWS. Refer to Amazon Kinesis Data Streams integrations for additional details.

Modern data streaming architecture with Kinesis Data Streams

A modern streaming data architecture with Kinesis Data Streams can be designed as a stack of five logical layers; each layer is composed of multiple purpose-built components that address specific requirements, as illustrated in the following diagram:

The architecture consists of the following key components:

  • Streaming sources – Your source of streaming data includes data sources like clickstream data, sensors, social media, Internet of Things (IoT) devices, log files generated by using your web and mobile applications, and mobile devices that generate semi-structured and unstructured data as continuous streams at high velocity.
  • Stream ingestion – The stream ingestion layer is responsible for ingesting data into the stream storage layer. It provides the ability to collect data from tens of thousands of data sources and ingest in real time. You can use the Kinesis SDK for ingesting streaming data through APIs, the Kinesis Producer Library for building high-performance and long-running streaming producers, or a Kinesis agent for collecting a set of files and ingesting them into Kinesis Data Streams. In addition, you can use many pre-build integrations such as AWS Database Migration Service (AWS DMS), Amazon DynamoDB, and AWS IoT Core to ingest data in a no-code fashion. You can also ingest data from third-party platforms such as Apache Spark and Apache Kafka Connect
  • Stream storage – Kinesis Data Streams offer two modes to support the data throughput: On-Demand and Provisioned. On-Demand mode, now the default choice, can elastically scale to absorb variable throughputs, so that customers do not need to worry about capacity management and pay by data throughput. The On-Demand mode automatically scales up 2x the stream capacity over its historic maximum data ingestion to provide sufficient capacity for unexpected spikes in data ingestion. Alternatively, customers who want granular control over stream resources can use the Provisioned mode and proactively scale up and down the number of Shards to meet their throughput requirements. Additionally, Kinesis Data Streams can store streaming data up to 24 hours by default, but can extend to 7 days or 365 days depending upon use cases. Multiple applications can consume the same stream.
  • Stream processing – The stream processing layer is responsible for transforming data into a consumable state through data validation, cleanup, normalization, transformation, and enrichment. The streaming records are read in the order they are produced, allowing for real-time analytics, building event-driven applications or streaming ETL (extract, transform, and load). You can use Amazon Managed Service for Apache Flink for complex stream data processing, AWS Lambda for stateless stream data processing, and AWS Glue & Amazon EMR for near-real-time compute. You can also build customized consumer applications with Kinesis Consumer Library, which will take care of many complex tasks associated with distributed computing.
  • Destination – The destination layer is like a purpose-built destination depending on your use case. You can stream data directly to Amazon Redshift for data warehousing and Amazon EventBridge for building event-driven applications. You can also use Amazon Kinesis Data Firehose for streaming integration where you can light stream processing with AWS Lambda, and then deliver processed streaming into destinations like Amazon S3 data lake, OpenSearch Service for operational analytics, a Redshift data warehouse, No-SQL databases like Amazon DynamoDB, and relational databases like Amazon RDS to consume real-time streams into business applications. The destination can be an event-driven application for real-time dashboards, automatic decisions based on processed streaming data, real-time altering, and more.

Real-time analytics architecture for time series

Time series data is a sequence of data points recorded over a time interval for measuring events that change over time. Examples are stock prices over time, webpage clickstreams, and device logs over time. Customers can use time series data to monitor changes over time, so that they can detect anomalies, identify patterns, and analyze how certain variables are influenced over time. Time series data is typically generated from multiple sources in high volumes, and it needs to be cost-effectively collected in near real time.

Typically, there are three primary goals that customers want to achieve in processing time-series data:

  • Gain insights real-time into system performance and detect anomalies
  • Understand end-user behavior to track trends and query/build visualizations from these insights
  • Have a durable storage solution to ingest and store both archival and frequently accessed data.

With Kinesis Data Streams, customers can continuously capture terabytes of time series data from thousands of sources for cleaning, enrichment, storage, analysis, and visualization.

The following architecture pattern illustrates how real time analytics can be achieved for Time Series data with Kinesis Data Streams:

Build a serverless streaming data pipeline for time series data

The workflow steps are as follows:

  1. Data Ingestion & Storage – Kinesis Data Streams can continuously capture and store terabytes of data from thousands of sources.
  2. Stream Processing – An application created with Amazon Managed Service for Apache Flink can read the records from the data stream to detect and clean any errors in the time series data and enrich the data with specific metadata to optimize operational analytics. Using a data stream in the middle provides the advantage of using the time series data in other processes and solutions at the same time. A Lambda function is then invoked with these events, and can perform time series calculations in memory.
  3. Destinations – After cleaning and enrichment, the processed time series data can be streamed to Amazon Timestream database for real-time dashboarding and analysis, or stored in databases such as DynamoDB for end-user query. The raw data can be streamed to Amazon S3 for archiving.
  4. Visualization & Gain insights – Customers can query, visualize, and create alerts using Amazon Managed Service for Grafana. Grafana supports data sources that are storage backends for time series data. To access your data from Timestream, you need to install the Timestream plugin for Grafana. End-users can query data from the DynamoDB table with Amazon API Gateway acting as a proxy.

Refer to Near Real-Time Processing with Amazon Kinesis, Amazon Timestream, and Grafana showcasing a serverless streaming pipeline to process and store device telemetry IoT data into a time series optimized data store such as Amazon Timestream.

Enriching & replaying data in real time for event-sourcing microservices

Microservices are an architectural and organizational approach to software development where software is composed of small independent services that communicate over well-defined APIs. When building event-driven microservices, customers want to achieve 1. high scalability to handle the volume of incoming events and 2. reliability of event processing and maintain system functionality in the face of failures.

Customers utilize microservice architecture patterns to accelerate innovation and time-to-market for new features, because it makes applications easier to scale and faster to develop. However, it is challenging to enrich and replay the data in a network call to another microservice because it can impact the reliability of the application and make it difficult to debug and trace errors. To solve this problem, event-sourcing is an effective design pattern that centralizes historic records of all state changes for enrichment and replay, and decouples read from write workloads. Customers can use Kinesis Data Streams as the centralized event store for event-sourcing microservices, because KDS can 1/ handle gigabytes of data throughput per second per stream and stream the data in milliseconds, to meet the requirement on high scalability and near real-time latency, 2/ integrate with Flink and S3 for data enrichment and achieving while being completely decoupled from the microservices, and 3/ allow retry and asynchronous read in a later time, because KDS retains the data record for a default of 24 hours, and optionally up to 365 days.

The following architectural pattern is a generic illustration of how Kinesis Data Streams can be used for Event-Sourcing Microservices:

The steps in the workflow are as follows:

  1. Data Ingestion and Storage – You can aggregate the input from your microservices to your Kinesis Data Streams for storage.
  2. Stream processing Apache Flink Stateful Functions simplifies building distributed stateful event-driven applications. It can receive the events from an input Kinesis data stream and route the resulting stream to an output data stream. You can create a stateful functions cluster with Apache Flink based on your application business logic.
  3. State snapshot in Amazon S3 – You can store the state snapshot in Amazon S3 for tracking.
  4. Output streams – The output streams can be consumed through Lambda remote functions through HTTP/gRPC protocol through API Gateway.
  5. Lambda remote functions – Lambda functions can act as microservices for various application and business logic to serve business applications and mobile apps.

To learn how other customers built their event-based microservices with Kinesis Data Streams, refer to the following:

Key considerations and best practices

The following are considerations and best practices to keep in mind:

  • Data discovery should be your first step in building modern data streaming applications. You must define the business value and then identify your streaming data sources and user personas to achieve the desired business outcomes.
  • Choose your streaming data ingestion tool based on your steaming data source. For example, you can use the Kinesis SDK for ingesting streaming data through APIs, the Kinesis Producer Library for building high-performance and long-running streaming producers, a Kinesis agent for collecting a set of files and ingesting them into Kinesis Data Streams, AWS DMS for CDC streaming use cases, and AWS IoT Core for ingesting IoT device data into Kinesis Data Streams. You can ingest streaming data directly into Amazon Redshift to build low-latency streaming applications. You can also use third-party libraries like Apache Spark and Apache Kafka to ingest streaming data into Kinesis Data Streams.
  • You need to choose your streaming data processing services based on your specific use case and business requirements. For example, you can use Amazon Kinesis Managed Service for Apache Flink for advanced streaming use cases with multiple streaming destinations and complex stateful stream processing or if you want to monitor business metrics in real time (such as every hour). Lambda is good for event-based and stateless processing. You can use Amazon EMR for streaming data processing to use your favorite open source big data frameworks. AWS Glue is good for near-real-time streaming data processing for use cases such as streaming ETL.
  • Kinesis Data Streams on-demand mode charges by usage and automatically scales up resource capacity, so it’s good for spiky streaming workloads and hands-free maintenance. Provisioned mode charges by capacity and requires proactive capacity management, so it’s good for predictable streaming workloads.
  • You can use the Kinesis Shared Calculator to calculate the number of shards needed for provisioned mode. You don’t need to be concerned about shards with on-demand mode.
  • When granting permissions, you decide who is getting what permissions to which Kinesis Data Streams resources. You enable specific actions that you want to allow on those resources. Therefore, you should grant only the permissions that are required to perform a task. You can also encrypt the data at rest by using a KMS customer managed key (CMK).
  • You can update the retention period via the Kinesis Data Streams console or by using the IncreaseStreamRetentionPeriod and the DecreaseStreamRetentionPeriod operations based on your specific use cases.
  • Kinesis Data Streams supports resharding. The recommended API for this function is UpdateShardCount, which allows you to modify the number of shards in your stream to adapt to changes in the rate of data flow through the stream. The resharding APIs (Split and Merge) are typically used to handle hot shards.


This post demonstrated various architectural patterns for building low-latency streaming applications with Kinesis Data Streams. You can build your own low-latency steaming applications with Kinesis Data Streams using the information in this post.

For detailed architectural patterns, refer to the following resources:

If you want to build a data vision and strategy, check out the AWS Data-Driven Everything (D2E) program.

About the Authors

Raghavarao Sodabathina is a Principal Solutions Architect at AWS, focusing on Data Analytics, AI/ML, and cloud security. He engages with customers to create innovative solutions that address customer business problems and to accelerate the adoption of AWS services. In his spare time, Raghavarao enjoys spending time with his family, reading books, and watching movies.

Hang Zuo is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enable customers to achieve their business goals.

Shwetha Radhakrishnan is a Solutions Architect for AWS with a focus in Data Analytics. She has been building solutions that drive cloud adoption and help organizations make data-driven decisions within the public sector. Outside of work, she loves dancing, spending time with friends and family, and traveling.

Brittany Ly is a Solutions Architect at AWS. She is focused on helping enterprise customers with their cloud adoption and modernization journey and has an interest in the security and analytics field. Outside of work, she loves to spend time with her dog and play pickleball.

Optimizing video encoding with FFmpeg using NVIDIA GPU-based Amazon EC2 instances

Post Syndicated from Macey Neff original https://aws.amazon.com/blogs/compute/optimizing-video-encoding-with-ffmpeg-using-nvidia-gpu-based-amazon-ec2-instances/

This post is written by Alejandro Gil, Solutions Architect and Joseba Echevarría, Solutions Architect. 


The purpose of this blog post is to compare video encoding performance between CPUs and Nvidia GPUs to determine the price/performance ratio in different scenarios while highlighting where it would be best to use a GPU.

Video encoding plays a critical role in modern media delivery, enabling efficient storage, delivery, and playback of high-quality video content across a wide range of devices and platforms.

Video encoding is frequently performed solely by the CPU because of its widespread availability and flexibility. Still, modern hardware includes specialized components designed specifically to obtain very high performance video encoding and decoding.

Nvidia GPUs, such as those found in the P and G Amazon EC2 instances, include this kind of built-in hardware in their NVENC (encoding) and NVDEC (decoding) accelerator engines, which can be used for real-time video encoding/decoding with minimal impact on the performance of the CPU or GPU.

NVIDIA NVDEC/NVENC architecture. Source https://developer.nvidia.com/video-codec-sdk

Figure 1: NVIDIA NVDEC/NVENC architecture. Source https://developer.nvidia.com/video-codec-sdk


Two main transcoding job types should be considered depending on the video delivery use case, 1) batch jobs for on demand video files and 2) streaming jobs for real-time, low latency use cases. In order to achieve optimal throughput and cost efficiency, it is a best practice to encode the videos in parallel using the same instance.

The utilized instance types in this benchmark can be found in figure 2 table (i.e g4dn and p3). For hardware comparison purposes, the p4d instance has been included in the table, showing the GPU specs and total number of NVDEC & NVENC cores in these EC2 instances. Based on the requirements, multiple GPU instances types are available in EC2.

Instance size GPUs GPU model NVDEC generation NVENC generation NVDEC cores/GPU NVENC cores/GPU
g4dn.xlarge 1 T4 4th 7th 2 1
p3.2xlarge 1 V100 3rd 6th 1 3
p4d.24xlarge 8 A100 4th N/A 5 0

Figure 2: GPU instances specifications


In order to determine which encoding strategy is the most convenient for each scenario, a benchmark will be conducted comparing CPU and GPU instances across different video settings. The results will be further presented using graphical representations of the performance indicators obtained.

The benchmark uses 3 input videos with different motion and detail levels (still, medium motion and high dynamic scene) in 4k resolution at 60 frames per second. The tests will show the average performance for encoding with FFmpeg 6.0 in batch (using Constant Rate Factor (CRF) mode) and streaming (using Constant Bit Rate (CBR)) with x264 and x265 codecs to five output resolutions (1080p, 720p, 480p, 360p and 160p).

The benchmark tests encoding the target videos into H.264 and H.265 using the x264 and x265 open-source libraries in FFmpeg 6.0 on the CPU and the NVENC accelerator when using the Nvidia GPU. The H.264 standard enjoys broad compatibility, with most consumer devices supporting accelerated decoding. The H.265 standard offers superior compression at a given level of quality than H.264 but hardware accelerated decoding is not as widely deployed. As a result, for most media delivery scenarios having more than one video format will be required in order to provide the best possible user experience.

Offline (batch) encoding

This test consists of a batch encoding with two different standard presets (ultrafast and medium for CPU-based encoding and p1 and medium presets for GPU-accelerated encoding) defined in the FFmpeg guide.

The following chart shows the relative cost of transcoding 1 million frames to the 5 different output resolutions in parallel for CPU-encoding EC2 instance (c6i.4xlarge) and two types of GPU-powered instances (g4dn.xlarge and p3.2xlarge). The results are normalized so that the cost of x264 ultrafast preset on c6i.4xlarge is equal to one.

Batch encoding performance for CPU and GPU instances.

Figure 3: Batch encoding performance for CPU and GPU instances.

The performance of batch encoding in the best GPU instance (g4dn.xlarge) shows around 73% better price/performance in x264 compared to the c6i.4xlarge and around 82% improvement in x265.

A relevent aspect to have in consideration is that the presets used are not exactly equivalent for each hardware because FFmpeg uses different operators depending on where the process runs (i.e CPU or GPU). As a consequence, the video outputs in each case have a noticeable difference between them. Generally, NVENC-based encoded videos (GPU) tend to have a higher quality in H.264, whereas CPU outputs present more encoding artifacts. The difference is more noticeable for lower quality cases (ultrafast/p1 presets or streaming use cases).

The following images compare the output quality for the medium motion video in the ultrafast/p1 and medium presets.

It is clearly seen in the following example, that the h264_nevenc (GPU) codec outperforms the libx264 codec (CPU) in terms of quality, showing less pixelation, especially in the ultrafast preset. For the medium preset, although the quality difference is less pronounced, the GPU output file is noticeably larger (refer to Figure 6 table).

Result comparison between GPU and CPU for h264, ultrafast

Figure 4: Result comparison between GPU and CPU for h264, ultrafast

Result comparison between GPU and CPU for h264, medium

Figure 5: Result comparison between GPU and CPU for h264, medium

The output file sizes mainly depend on the preset, codec and input video. The different configurations can be found in the following table.

Sizes for output batch encoded videos. Streaming not represented because the size is the same (fixed bitrate)

Figure 6: Sizes for output batch encoded videos. Streaming not represented because the size is the same (fixed bitrate)

Live stream encoding

For live streaming use cases, it is useful to measure how many streams a single instance can maintain transcoding to five output resolutions (1080p, 720p, 480p, 360p and 160p). The following results are the relative cost of each instance, which is the ratio of number of streams the instance was able to sustain divided by the cost per hour.

Streaming encoding performance for CPU and GPU instances.

Figure 6: Streaming encoding performance for CPU and GPU instances.

The previous results show that a GPU-based instance family like g4dn is ideal for streaming use cases, where they can sustain up to 4 parallel encodings from 4K to 1080p, 720p, 480p, 360p & 160p simultaneously. Notice that the GPU-based p5 family performance is not compensating the cost increase.

On the other hand, the CPU-based instances can sustain 1 parallel stream (at most). If you want to sustain the same number of parallel streams in Intel-based instances, you’d have to opt for a much larger instance (c6i.12xlarge can almost sustain 3 simultaneous streams, but it struggles to keep up with the more dynamic scenes when encoding with x265) with a much higher cost ($2.1888 hourly for c6i.12xlarge vs $0.587 for g4dn.xlarge).

The price/performance difference is around 68% better in GPU for x264 and 79% for x265.


The results show that for the tested scenarios there can be a price-performance gain when transcoding with GPU compared to CPU. Also, GPU-encoded videos tend to have an equal or higher perceived quality level to CPU-encoded counterparts and there is no significant performance penalty for encoding to the more advanced H.265 format, which can make GPU-based encoding pipelines an attractive option.

Still, CPU-encoders do a particularly good job with containing output file sizes for most of the cases we tested, producing smaller output file sizes even when the perceived quality is simmilar. This is an important aspect to have into account since it can have a big impact in cost. Depending on the amount of media files distributed and consumed by final users, the data transfer and storage cost will noticeably increase if GPUs are used. With this in mind, it is important to weight the compute costs with the data transfer and storage costs for your use case when chosing to use CPU or GPU-based video encoding.

One additional point to be considered is pipeline flexibility. Whereas the GPU encoding pipeline is rigid, CPU-based pipelines can be modified to the customer’s needs, including  additional FFmpeg filters to accommodate future needs as required.

The test did not include any specific quality measurements in the transcoded images, but it would be interesting to perform an analysis based on quantitative VMAF (or similar algorithm) metrics for the videos. We always recommend to make your own test to validate if the results obtained meet your requirements.

Benchmarking method

This blog post extends on the original work described in Optimized Video Encoding with FFmpeg on AWS Graviton Processors and the benchmarking process has been maintained in order to preserve consistency of the benchmark results. The original article analyzes in detail the price/performance advantages of AWS Graviton 3 compared to other processors.

Batch encoding workflow

Figure 7: Batch encoding workflow

Best Practices for Prompt Engineering with Amazon CodeWhisperer

Post Syndicated from Brendan Jenkins original https://aws.amazon.com/blogs/devops/best-practices-for-prompt-engineering-with-amazon-codewhisperer/

Generative AI coding tools are changing the way developers accomplish day-to-day development tasks. From generating functions to creating unit tests, these tools have helped customers accelerate software development. Amazon CodeWhisperer is an AI-powered productivity tools for the IDE and command line that helps improve developer productivity by providing code recommendations based on developers’ natural language comments and surrounding code. With CodeWhisperer, developers can simply write a comment that outlines a specific task in plain English, such as “create a lambda function to upload a file to S3.”

When writing these input prompts to CodeWhisperer like the natural language comments, one important concept is prompt engineering. Prompt engineering is the process of refining interactions with large language models (LLMs) in order to better refine the output of the model. In this case, we want to refine our prompts provided to CodeWhisperer to produce better code output.

In this post, we’ll explore how to take advantage of CodeWhisperer’s capabilities through effective prompt engineering in Python. A well-crafted prompt lets you tap into the tool’s full potential to boost your productivity and help generate the correct code for your use case. We’ll cover prompt engineering best practices like writing clear, specific prompts and providing helpful context and examples. We’ll also discuss how to iteratively refine prompts to produce better results.

Prompt Engineering with CodeWhisperer

We will demonstrate the following best practices when it comes to prompt engineering with CodeWhisperer.

  • Keep your prompt specific and concise
  • Additional context in prompts
  • Utilizing multiple comments
  • Context taken from comments and code
  • Generating unit tests with cross file context
  • Prompts with cross file context


The following prerequisites are required to experiment locally:

CodeWhisperer User Actions

Reference the following user actions documentation for CodeWhisperer user actions according to your IDE. In this documentation, you will see how to accept a recommendation, cycle through recommendation options, reject a recommendation, and manually trigger CodeWhisperer.

Keep prompts specific & concise

In this section, we will cover keeping your prompt specific and concise. When crafting prompts for CodeWhisperer, conciseness while maintaining objectives in your prompt is important.  Overly complex prompts lead to poor results. A good prompt contains just enough information to convey the request clearly and concisely. For example, if you prompt CodeWhisperer “create a function that eliminates duplicates lines in a text file”. This is an example of a specific and concise prompt. On the other hand, a prompt such as “create a function to look for lines of code that are seen multiple times throughout the file and delete them” may be unclear and overly wordy. In summary, focused, straightforward prompts helps CodeWhisperer understand exactly what you want and provide better outputs.

In this example, we would like to write a function in Python that will open a CSV file and store the contents into a dictionary. We will use the following simple and concise prompt that will guide CodeWhisperer to generate recommendations. Please use the left/right arrow key to cycle through the various recommendations before you hit tab to accept the recommendation.

Example 1:

Sample comment:

#load the csv file content in a dictionary

Sample solution:

#load the csv file content in a dictionary
import csv
def csv_to_dict(csv_file):
    with open(csv_file, 'r') as f:
        reader = csv.DictReader(f)
        return list(reader)

Simple and concise prompts are crucial in prompt engineering because they help CodeWhisperer understand the key information without confusion from extraneous details. Simplicity and brevity enable faster iteration and allow prompts to maximize impact within character limits.

Additional context in prompts

In this section, we will cover how additional context can aid in prompt engineering. While specific and concise prompts are crucial, some additional context can aid CodeWhisperer comprehension. Concrete examples also guide CodeWhisperer if it struggles to infer expectations from just a brief prompt.

In this example, we would like to add additional context to Example 1 where we stored the CSV file content into a dictionary. Now, we have additional requirements to store the csv file content in alphabetical order and return the list keys from the dictionary. Take a look at the sample prompt below. Judicious context helps CodeWhisperer to produce higher-quality, tailored results.

Example 2:

Sample comment:

#load the csv file content in a dictionary in alphabetical order and return the list of keys

Sample solution:

#load the csv file content in a dictionary in alphabetical order and return the list of keys
import csv
def csv_to_dict(file_name):
    def read_csv_file(file_name):
    with open(file_name, 'r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        csv_dict = {}
        for row in csv_reader:
            csv_dict[row['name']] = row
            return csv_dict

Providing additional context through background details and examples can be beneficial when crafting prompts for CodeWhisperer, as long as the extra information adds useful clarity rather than obscuring the core request. The right balance of brevity and pointed contextual signals can help CodeWhisperer generate more tailored, high-quality results.

Utilizing multiple comments

In this section, we will cover how multiple comments can be a useful technique in prompt engineering. When used strategically, multiple comments allow prompt engineers to offer more context without sacrificing brevity or cluttering the prompt.

Say we would like to open a CSV file and return the list of lines in alphabetical order, remove duplicate lines, and insert a period at the end of each line from the CSV file. Take a look at the sample CodeWhisperer prompt below. Notice how you can break up multiple requirements into separate comments.

Example 3:

Sample comment:

#open a csv file and return a list of lines in alphabetical order
#Remove duplicate lines
#Insert a period at the end of each line

Sample solution:

#open a csv file and return a list of lines in alphabetical order
#Remove duplicate lines
#Insert a period at the end of each line
def open_csv(filename):
    with open(filename) as f:
        lines = f.readlines()
        lines = list(set(lines))
        lines = sorted(lines)
        for i in range(len(lines)):
            lines[i] = lines[i].rstrip() + '.'
    return lines

Multiple comments allow prompt engineers to add extended context and guidance for CodeWhisperer while keeping prompts succinct.

Context taken from comments and code

In this section, we will cover how CodeWhisperer’s context goes beyond just your comment and also looks at the surrounding code, including other functions, imports, and more. This broader context helps guide CodeWhisperer towards implementing the use case you intend with your comment.

We will now see how additional code in our project affects the responses. This time around, we will import the Pandas library to see how it effects our recommendation as compared to the previous section.

Example 4:

Sample Comment:

import pandas as pd
#open a csv file and return a list of lines in alphabetical order
#Insert a period at the end of each line
#Replace duplicate lines with a single line

Sample solution:

import pandas as pd
#open a csv file and return a list of lines in alphabetical order
#Insert a period at the end of each line
#Replace duplicate lines with a single line
def open_csv(filename):
    df = pd.read_csv(filename)
    df = df.sort_values(by='line')
    df = df.drop_duplicates(subset='line')
    df['line'] = df['line'] + '.'
    return df['line'].tolist()

By seeing Pandas imported, CodeWhisperer understands our intent is likely to leverage it in the solution. This allows it to provide a more relevant recommendation using Pandas functions like read_csv(), sort_values(), and drop_duplicates().

Overall, surrounding code context gives CodeWhisperer additional clues for the implementation you have in mind with your high-level instructions.

Prompts with cross file context

In the previous section, we saw how we were able to utilize the surrounding code that CodeWhisperer takes in as context to generate a function for our use case. In this section, we will use that same concept to generate unit tests for the function we’ve built utilizing CodeWhisperers cross file context capability. This section will demonstrate how we can use cross file context with our prompts for use cases like test driven development.

In this example, we will prompt CodeWhisperer to write a comment referencing the open_csv function in order to write unit tests. In this case, there will be a new python file in the project directory for unit testing. We will assume the file we have been working out of is “example4.py”, and the new file is “unittest.py”. We will incorporate the following comments in “unittest.py”.

Example 5:

Sample Comment in unittest.py:

#create unit tests for the open_csv function from example4.py file

Sample solution in unittest.py:

#create unit tests for the open_csv function from example4.py file
class TestOpenCsv(unittest.TestCase):
   def test_open_csv(self):
       self.assertEqual(open_csv('example4.csv'), ['a.', 'b.', 'c.'])
       self.assertEqual(open_csv('example4.csv'), ['a.', 'b.', 'c.', 'd.'])

Notice how CodeWhisperer is using context from one file to generate code recommendation in another file. By specifying the open_csv function within the comment of the “unittest.py”, CodeWhisperer was able to analyze the function, understand its purpose and interface, and generate a set of basic unit tests to validate it. With our prompts, we were able to utilize CodeWhisperers cross file context to help us generate unit tests.

Chain of thought prompting

Chain of thought prompting is a prompt engineering technique that allows large language models (LLMs) to have more natural, contextual outputs by linking multiple prompts together to solve a complex problem. In regard to CodeWhisperer, we can use this technique to break a very complex coding task down into smaller steps, allowing CodeWhisperer to provide more accurate suggestions to the use case.

In this example, we will examine a code snippet generated by CodeWhisperer that contains multiple comments. Our intention is to prompt the user to input a filename, ensure it’s a csv and alphanumeric, then process it with the logging.info method. We will analyze the output using both the chain of thought approach and a single comment.

Example 6:

Chain of thought prompt approach:

Comment 1 – “Take a user’s input using the input() function and store it in a variable called filename”

import logging
# Take a users input using the input() function and store it in a variable called filename
filename = input("Enter the name of the file you want to read: ")

Comment 2 – create a function that will take a filename as an input

import logging
# Take a users input using the input() function and store it in a variable called filename
filename = input("Enter the name of the file you want to read: ")

#create a function that will take a filename as an input
def open_file(filename):

Comment 3 – validate the input using the isalnum() method and ensure the file ends in .csv then process the file using logging.info()

import logging
# Take a users input using the input() function and store it in a variable called filename
filename = input("Enter the name of the file you want to read: ")

#create a function that will take a filename as an input
def open_file(filename):
    #validate the input using the isalnum() method and ensure the file ends in .csv then process the file using logging.info() 
    if filename.isalnum() and filename.endswith('.csv'):
        lines = open_csv(filename)
        return lines
        print('Invalid file name')
        return None

Single prompt approach:

import logging
Using the input() function and store it in a variable called filename and create a function 
that will validate the input using the isalnum() method and ensure the file ends in .csv then process 
the file accordingly. 
def validate_file(filename):
    if filename.isalnum() and filename.endswith('.csv'):
        return True
        return False

When analyzing these side-by-side, we see that with the chain of thought prompt approach, we used multiple comments to allow CodeWhisperer to implement all our requirements including the user input, input validation, .csv verification, and logging as we broke it down into steps for CodeWhisperer to implement. On the other hand, in the case where we had a single comment implementing multiple requirements, it didn’t take all the requirements into account for this slightly more complex problem.

In conclusion, chain of thought prompting allows large language models like CodeWhisperer to produce more accurate code pertaining to the use case by breaking down complex problems into logical steps. Guiding the model through comments and prompts helps it focus on each part of the task sequentially. This results in code that is more accurate to the desired functionality compared to a single broad prompt.


Effective prompt engineering is key to getting the most out of powerful AI coding assistants like Amazon CodeWhisperer. Following prompt best practices, we’ve covered like using clear language, providing context, and iteratively refining prompts can help CodeWhisperer generate high-quality code tailored to your specific needs. Analyzing all the code options CodeWhisperer provides you flexibility to select the optimal approach.

About the authors:

Brendan Jenkins

Brendan Jenkins is a Solutions Architect at Amazon Web Services (AWS) working with Enterprise AWS customers providing them with technical guidance and helping achieve their business goals. He has an area of specialization in DevOps and Machine Learning technology.

Riya Dani

Riya Dani is a Solutions Architect at Amazon Web Services (AWS), responsible for helping Enterprise customers on their journey in the cloud. She has a passion for learning and holds a Bachelor’s and Master’s degree from Virginia Tech in Computer Science with focus in Deep Learning. In her free time, she enjoys staying active and reading.

Amazon OpenSearch Serverless now supports automated time-based data deletion 

Post Syndicated from Satish Nandi original https://aws.amazon.com/blogs/big-data/amazon-opensearch-serverless-now-supports-automated-time-based-data-deletion/

We recently announced a new enhancement to OpenSearch Serverless for managing data retention of Time Series collections and Indexes. OpenSearch Serverless for Amazon OpenSearch Service makes it straightforward to run search and analytics workloads without having to think about infrastructure management. With the new automated time-based data deletion feature, you can specify how long they want to retain data and OpenSearch Serverless automatically manages the lifecycle of the data based on this configuration.

To analyze time series data such as application logs and events in OpenSearch, you must create and ingest data into indexes. Typically, these logs are generated continuously and ingested frequently, such as every few minutes, into OpenSearch. Large volumes of logs can consume a lot of the available resources such as storage in the clusters and therefore need to be managed efficiently to maximize optimum performance. You can manage the lifecycle of the indexed data by using automated tooling to create daily indexes. You can then use scripts to rotate the indexed data from the primary storage in clusters to a secondary remote storage to maintain performance and control costs, and then delete the aged data after a certain retention period.

The new automated time-based data deletion feature in OpenSearch Serverless minimizes the need to manually create and manage daily indexes or write data lifecycle scripts. You can now create a single index and OpenSearch Serverless will handle creating a timestamped collection of indexes under one logical grouping automatically. You only need to configure the desired data retention policies for your time series data collections. OpenSearch Serverless will then efficiently roll over indexes from primary storage to Amazon Simple Storage Service(Amazon S3) as they age, and automatically delete aged data per the configured retention policies, reducing the operational overhead and saving costs.

In this post we discuss the new data lifecycle polices and how to get started with these polices in OpenSearch Serverless

Solution Overview

Consider a use case where the fictitious  company Octank Broker collects logs from its web services and ingests them into OpenSearch Serverless for service availability analysis. The company is interested in tracking web access and root cause when failures are seen with error types 4xx and 5xx. Generally, the server issues are of interest within an immediate timeframe, say in a few days. After 30 days, these logs are no longer of interest.

Octank wants to retain their log data for 7 days. If the collections or indexes are configured for 7 days’ data retention, then after 7 days, OpenSearch Serverless deletes the data. The indexes are no longer available for search. Note: Document counts in search results might reflect data that is marked for deletion for a short time.

You can configure data retention by creating a data lifecycle policy. The retention time can be unlimited, or a you can provide a specific time length in Days and Hours with a minimum retention of 24 hours and a maximum of 10 years. If the retention time is unlimited, as the name suggests, no data is deleted.

To start using data lifecycle policies in OpenSearch Serverless, you can follow the steps outlined in this post.


This post assumes that you have already set up an OpenSearch Serverless collection. If not, refer to Log analytics the easy way with Amazon OpenSearch Serverless for instructions.

Create a data lifecycle policy

You can create a data lifecycle policy from the AWS Management Console, the AWS Command Line Interface (AWS CLI), AWS CloudFormation, AWS Cloud Development Kit (AWS CDK), and Terraform. To create a data lifecycle policy via the console, complete the following steps:

  • On the OpenSearch Service console, choose Data lifecycle policies under Serverless in the navigation pane.
  • Choose Create data lifecycle policy.
  • For Data lifecycle policy name, enter a name (for example, web-logs-policy).
  • Choose Add under Data lifecycle.
  • Under Source Collection, choose the collection to which you want to apply the policy (for example, web-logs-collection).
  • Under Indexes, enter the index or index patterns to apply the retention duration (for example, web-logs).
  • Under Data retention, disable Unlimited (to set up the specific retention for the index pattern you defined).
  • Enter the hours or days after which you want to delete data from Amazon S3.
  • Choose Create.

The following graphic gives a quick demonstration of creating the OpenSearch Serverless Data lifecycle policies via the preceding steps.

View the data lifecycle policy

After you have created the data lifecycle policy, you can view the policy by completing the following steps:

  • On the OpenSearch Service console, choose Data lifecycle policies under Serverless in the navigation pane.
  • Select the policy you want to view (for example, web-logs-policy).
  • Choose the hyperlink under Policy name.

This page will show you the details such as the index pattern and its retention period for a specific index and collection. The following graphic gives a quick demonstration of viewing the OpenSearch Serverless data lifecycle policies via the preceding steps.

Update the data lifecycle policy

After you have created the data lifecycle policy, you can modify and update it to add more rules. For example, you can add another index pattern or add a new collection with a new index pattern to set up the retention. The following example shows the steps to add another rule in the policy for syslog index under syslogs-collection.

  • On the OpenSearch Service console, choose Data lifecycle policies under Serverless in the navigation pane.
  • Select the policy you want to edit (for example, web-logs-policy), then choose Edit.
  • Choose Add under Data lifecycle.
  • Under Source Collection, choose the collection you are going to use for setting up the data lifecycle policy (for example, syslogs-collection).
  • Under Indexes, enter index or index patterns you are going to set retention for (for example, syslogs).
  • Under Data retention, disable Unlimited (to set up specific retention for the index pattern you defined).
  • Enter the hours or days after which you want to delete data from Amazon S3.
  • Choose Save.

The following graphic gives a quick demonstration of updating existing data lifecycle policies via the preceding steps.

Delete the data lifecycle policy

Delete the existing data lifecycle policy with the following steps:

  • On the OpenSearch Service console, choose Data lifecycle policies under Serverless in the navigation pane.
  • Select the policy you want to edit (for example, web-logs-policy).
  • Choose Delete.

Data lifecycle policy rules

In a data lifecycle policy, you specify a series of rules. The data lifecycle policy lets you manage the retention period of data associated to indexes or collections that match these rules. These rules define the retention period for data in an index or group of indexes. Each rule consists of a resource type (index), a retention period, and a list of resources (indexes) that the retention period applies to.

You define the retention period with one of the following formats:

  • “MinIndexRetention”: “24h” – OpenSearch Serverless retains the index data for a specified period in hours or days. You can set this period to be from 24 hours (24h) to 3,650 days (3650d).
  • “NoMinIndexRetention”: true – OpenSearch Serverless retains the index data indefinitely.

When data lifecycle policy rules overlap, within or across policies, the rule with a more specific resource name or pattern for an index overrides a rule with a more general resource name or pattern for any indexes that are common to both rules. For example, in the following policy, two rules apply to the index index/sales/logstash. In this situation, the second rule takes precedence because index/sales/log* is the longest match to index/sales/logstash. Therefore, OpenSearch Serverless sets no retention period for the index.


Data lifecycle policies provide a consistent and straightforward way to manage indexes in OpenSearch Serverless. With data lifecycle policies, you can automate data management and avoid human errors. Deleting non-relevant data without manual intervention reduces your operational load, saves storage costs, and helps keep the system performant for search.

About the authors

Prashant Agrawal is a Senior Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Satish Nandi is a Senior Product Manager with Amazon OpenSearch Service. He is focused on OpenSearch Serverless and has years of experience in networking, security and ML/AI. He holds a Bachelor degree in Computer Science and an MBA in Entrepreneurship. In his free time, he likes to fly airplanes, hang gliders and ride his motorcycle.

How Transfer Family can help you build a secure, compliant managed file transfer solution

Post Syndicated from John Jamail original https://aws.amazon.com/blogs/security/how-transfer-family-can-help-you-build-a-secure-compliant-managed-file-transfer-solution/

Building and maintaining a secure, compliant managed file transfer (MFT) solution to securely send and receive files inside and outside of your organization can be challenging. Working with a competent, vigilant, and diligent MFT vendor to help you protect the security of your file transfers can help you address this challenge. In this blog post, I will share how AWS Transfer Family can help you in that process, and I’ll cover five ways to use the security features of Transfer Family to get the most out of this service. AWS Transfer Family is a fully managed service for file transfers over SFTP, AS2, FTPS, and FTP for Amazon Simple Storage Service (Amazon S3) and Amazon Elastic File System (Amazon EFS).

Benefits of building your MFT on top of Transfer Family

As outlined in the AWS Shared Responsibility Model, security and compliance are a shared responsibility between you and Transfer Family. This shared model can help relieve your operational burden because AWS operates, manages, and controls the components from the application, host operating system, and virtualization layer down to the physical security of the facilities in which the service operates. You are responsible for the management and configuration of your Transfer Family server and the associated applications outside of Transfer Family.

AWS follows industry best practices, such as automated patch management and continuous third-party penetration testing, to enhance the security of Transfer Family. This third-party validation and the compliance of Transfer Family with various regulatory regimes (such as SOC, PCI, HIPAA, and FedRAMP) integrates with your organization’s larger secure, compliant architecture.

One example of a customer who benefited from using Transfer Family is Regeneron. Due to their needs for regulatory compliance and security, and their desire for a scalable architecture, they moved their file transfer solution to Transfer Family. Through this move, they achieved their goal of a secure, compliant architecture and lowered their overall costs by 90%. They were also able to automate their malware scanning process for the intake of files. For more information on their success story, see How Regeneron built a secure and scalable file transfer service using AWS Transfer Family. There are many other documented success stories from customers, including Liberty Mutual, Discover, and OpenGamma.

Steps you can take to improve your security posture with Transfer Family

Although many of the security improvements that Transfer Family makes don’t require action on your part to use, you do need to take action on a few for compatibility reasons. In this section, I share five steps that you should take to adopt a secure, compliant architecture on Transfer Family.

  • Use strong encryption for data in transit — The first step in building a secure, compliant MFT service is to use strong encryption for data in transit. To help with this, Transfer Family now offers a strong set of available ciphers, including post-quantum ciphers that have been designed to resist decryption from future, fault-tolerant quantum computers that are still several years from production. Transfer Family will offer this capability by default for newly created servers after January 31, 2024. Existing customers can select this capability today by choosing the latest Transfer Family security policy. We review the choice of the default security policy for Transfer Family periodically to help ensure the best security posture for customers. For information about how to check what security policy you’re using and how to update it, see Security policies for AWS Transfer Family.
  • Duplicate your server’s host key — You need to make sure that a threat actor can’t impersonate your server by duplicating your server’s host key. Your server’s host key is a vital component of your secure, compliant architecture to help prevent man-in-the-middle style events where a threat actor can impersonate your server and convince your users to provide sensitive login information and data. To help prevent this possibility, we recommend that Transfer Family SFTP servers use at least a 4,096-bit RSA, ED25519, or ECDSA host key. As part of our shared responsibility model to help you build a secure global infrastructure, Transfer Family will increase its default host key size to 4,096 bits for newly created servers after January 31, 2024. To make key rotation as simple as possible for those with weaker keys, Transfer Family supports the use of multiple host keys of multiple types on a single server. However, you should deprecate the weaker keys as soon as possible because your server is only as secure as its weakest key. To learn what keys you’re using and how to rotate them, see Key management.

The next three steps apply if you use the custom authentication option in Transfer Family, which helps you use your existing identity providers to lift and shift workflows onto Transfer Family.

  • Require both a password and a key — To increase your security posture, you can require the use of both a password and key to help protect your clients from password scanners and a threat actor that might have stolen their key. For details on how to view and configure this, see Create an SFTP-enabled server.
  • Use Base64 encoding for passwords — The next step to improve your security posture is to use or update your custom authentication templates to use Base64 encoding for your passwords. This allows for a wider variety of characters and makes it possible to create more complex passwords. In this way, you can be more inclusive of a global audience that might prefer to use different character sets for their passwords. A more diverse character set for your passwords also makes your passwords more difficult for a threat actor to guess and compromise. The example templates for Transfer Family make use of Base64 encoding for passwords. For more details on how to check and update your templates to password encoding to use Base64, see Authenticating using an API Gateway method.
  • Set your API Gateway method’s authorizationType property to AWS_IAM — The final recommended step is to make sure that you set your API Gateway method’s authorizationType property to AWS_IAM to require that the caller submit the user’s credentials to be authenticated. With IAM authorization, you sign your requests with a signing key derived from your secret access key, instead of your secret access key itself, helping to ensure that authorization requests to your identity provider use AWS Signature Version 4. This provides an extra layer of protection for your secret access key. For details on how to set up AWS_IAM authorization, see Control access to an API with IAM permissions.


Transfer Family offers many benefits to help you build a secure, compliant MFT solution. By following the steps in this post, you can get the most out of Transfer Family to help protect your file transfers. As the requirements for a secure, compliant architecture for file transfers evolve and threats become more sophisticated, Transfer Family will continue to offer optimized solutions and provide actionable advice on how you can use them. For more information, see Security in AWS Transfer Family.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

John Jamail

John Jamail

John is the Head of Engineering for AWS Transfer Family. Prior to joining AWS, he spent eight years working in data security focused on security incident and event monitoring (SIEM), governance, risk, and compliance (GRC), and data loss prevention (DLP).

Run Kinesis Agent on Amazon ECS

Post Syndicated from Buddhike de Silva original https://aws.amazon.com/blogs/big-data/run-kinesis-agent-on-amazon-ecs/

Kinesis Agent is a standalone Java software application that offers a straightforward way to collect and send data to Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose. The agent continuously monitors a set of files and sends new data to the desired destination. The agent handles file rotation, checkpointing, and retry upon failures. It delivers all of your data in a reliable, timely, and simple manner. It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process.

This post describes the steps to send data from a containerized application to Kinesis Data Firehose using Kinesis Agent. More specifically, we show how to run Kinesis Agent as a sidecar container for an application running in Amazon Elastic Container Service (Amazon ECS). After the data is in Kinesis Data Firehose, it can be sent to any supported destination, such as Amazon Simple Storage Service (Amazon S3).

In order to present the key points required for this setup, we assume that you are familiar with Amazon ECS and working with containers. We also avoid the implementation details and packaging process of our test data generation application, referred to as the producer.

Solution overview

As depicted in the following figure, we configure a Kinesis Agent container as a sidecar that can read files created by the producer container. In this instance, the producer and Kinesis Agent containers share data via a bind mount in Amazon ECS.

Solution design diagram


You should satisfy the following prerequisites for the successful completion of this task:

With these prerequisites in place, you can begin next step to package a Kinesis Agent and your desired agent configuration as a container in your local development machine.

Create a Kinesis Agent configuration file

We use the Kinesis Agent configuration file to configure the source and destination, among other data transfer settings. The following code uses the minimal configuration required to read the contents of files matching /var/log/producer/*.log and publish them to a Kinesis Data Firehose delivery stream called kinesis-agent-demo:

    "firehose.endpoint": "firehose.ap-southeast-2.amazonaws.com",
    "flows": [
            "deliveryStream": "kinesis-agent-demo",
            "filePattern": "/var/log/producer/*.log"

Create a container image for Kinesis Agent

To deploy Kinesis Agent as a sidecar in Amazon ECS, you first have to package it as a container image. The container must have Kinesis Agent, which and find binaries, and the Kinesis Agent configuration file that you prepared earlier. Its entry point must be configured using the start-aws-kinesis-agent script. This command is installed when you run the yum install aws-kinesis-agent step. The resulting Dockerfile should look as follows:

FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]

Run the docker build command to build this container:

docker build -t kinesis-agent .

After the image is built, it should be pushed to a container registry like Amazon ECR so that you can reference it in the next section.

Create an ECS task definition with Kinesis Agent and the application container

Now that you have Kinesis Agent packaged as a container image, you can use it in your ECS task definitions to run as sidecar. To do that, you create an ECS task definition with your application container (called producer) and Kinesis Agent container. All containers in a task definition are scheduled on the same container host and therefore can share resources such as bind mounts.

In the following sample container definition, we use a bind mount called logs_dir to share a directory between the producer container and kinesis-agent container.

You can use the following template as a starting point, but be sure to change taskRoleArn and executionRoleArn to valid IAM roles in your AWS account. In this instance, the IAM role used for taskRoleArn must have write permissions to Kinesis Data Firehose that you specified earlier in the agent.json file. Additionally, make sure that the ECR image paths and awslogs-region are modified as per your AWS account.

    "family": "kinesis-agent-demo",
    "taskRoleArn": "arn:aws:iam::111111111:role/kinesis-agent-demo-task-role",
    "executionRoleArn": "arn:aws:iam::111111111:role/kinesis-agent-test",
    "networkMode": "awsvpc",
    "containerDefinitions": [
            "name": "producer",
            "image": "111111111.dkr.ecr.ap-southeast-2.amazonaws.com/producer:latest",
            "cpu": 1024,
            "memory": 2048,
            "essential": true,
            "command": [
            "mountPoints": [
                    "sourceVolume": "logs_dir",
                    "containerPath": "/var/log/producer",
                    "readOnly": false
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "producer",
                    "awslogs-stream-prefix": "producer",
                    "awslogs-region": "ap-southeast-2"
            "name": "kinesis-agent",
            "image": "111111111.dkr.ecr.ap-southeast-2.amazonaws.com/kinesis-agent:latest",
            "cpu": 1024,
            "memory": 2048,
            "essential": true,
            "mountPoints": [
                    "sourceVolume": "logs_dir",
                    "containerPath": "/var/log/producer",
                    "readOnly": true
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "kinesis-agent",
                    "awslogs-stream-prefix": "kinesis-agent",
                    "awslogs-region": "ap-southeast-2"
    "volumes": [
            "name": "logs_dir"
    "requiresCompatibilities": [
    "cpu": "2048",
    "memory": "4096"

Register the task definition with the following command:

aws ecs register-task-definition --cli-input-json file://./task-definition.json

Run a new ECS task

Finally, you can run a new ECS task using the task definition you just created using the aws ecs run-task command. When the task is started, you should be able to see two containers running under that task on the Amazon ECS console.

Amazon ECS console screenshot


This post showed how straightforward it is to run Kinesis Agent in a containerized environment. Although we used Amazon ECS as our container orchestration service in this post, you can use a Kinesis Agent container in other environments such as Amazon Elastic Kubernetes Service (Amazon EKS).

To learn more about using Kinesis Agent, refer to Writing to Amazon Kinesis Data Streams Using Kinesis Agent. For more information about Amazon ECS, refer to the Amazon ECS Developer Guide.

About the Author

Buddhike de Silva is a Senior Specialist Solutions Architect at Amazon Web Services. Buddhike helps customers run large scale streaming analytics workloads on AWS and make the best out of their cloud journey.

Access AWS using a Google Cloud Platform native workload identity

Post Syndicated from Simran Singh original https://aws.amazon.com/blogs/security/access-aws-using-a-google-cloud-platform-native-workload-identity/

Organizations undergoing cloud migrations and business transformations often find themselves managing IT operations in hybrid or multicloud environments. This can make it more complex to safeguard workloads, applications, and data, and to securely handle identities and permissions across Amazon Web Services (AWS), hybrid, and multicloud setups.

In this post, we show you how to assume an AWS Identity and Access Management (IAM) role in your AWS accounts to securely issue temporary credentials for applications that run on the Google Cloud Platform (GCP). We also present best practices and key considerations in this authentication flow. Furthermore, this post provides references to supplementary GCP documentation that offer additional context and provide steps relevant to setup on GCP.

Access control across security realms

As your multicloud environment grows, managing access controls across providers becomes more complex. By implementing the right access controls from the beginning, you can help scale your cloud operations effectively without compromising security. When you deploy apps across multiple cloud providers, you should implement a homogenous and consistent authentication and authorization mechanism across both cloud environments, to help maintain a secure and cost-effective environment. In the following sections, you’ll learn how to enforce such objectives across AWS and workloads hosted on GCP, as shown in Figure 1.

Figure 1: Authentication flow between GCP and AWS

Figure 1: Authentication flow between GCP and AWS


To follow along with this walkthrough, complete the following prerequisites.

  1. Create a service account in GCP. Resources in GCP use service accounts to make API calls. When you create a GCP resource, such as a compute engine instance in GCP, a default service account gets created automatically. Although you can use this default service account in the solution described in this post, we recommend that you create a dedicated user-managed service account, because you can control what permissions to assign to the service account within GCP.

    To learn more about best practices for service accounts, see Best practices for using service accounts in the Google documentation. In this post, we use a GCP virtual machine (VM) instance for demonstration purposes. To attach service accounts to other GCP resources, see Attach service accounts to resources.

  2. Create a VM instance in GCP and attach the service account that you created in Step 1. Resources in GCP store their metadata information in a metadata server, and you can request an instance’s identity token from the server. You will use this identity token in the authentication flow later in this post.
  3. Install the AWS Command Line Interface (AWS CLI) on the GCP VM instance that you created in Step 2.
  4. Install jq and curl.

GCP VM identity authentication flow

Obtaining temporary AWS credentials for workloads that run on GCP is a multi-step process. In this flow, you use the identity token from the GCP compute engine metadata server to call the AssumeRoleWithWebIdentity API to request AWS temporary credentials. This flow gives your application greater flexibility to request credentials for an IAM role that you have configured with a sufficient trust policy, and the corresponding Amazon Resource Name (ARN) for the IAM role must be known to the application.

Define an IAM role on AWS

Because AWS already supports OpenID Connect (OIDC) federation, you can use the OIDC token provided in GCP as described in Step 2 of the Prerequisites, and you don’t need to create a separate OIDC provider in your AWS account. Instead, to create an IAM role for OIDC federation, follow the steps in Creating a role for web identity or OpenID Connect Federation (console). Using an OIDC principal without a condition can be overly permissive. To make sure that only the intended identity provider assumes the role, you need to provide a StringEquals condition in the trust policy for this IAM role. Add the condition keys accounts.google.com:aud, accounts.google.com:oaud, and accounts.google.com:sub to the role’s trust policy, as shown in the following.

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Principal": {"Federated": "accounts.google.com"},
            "Action": "sts:AssumeRoleWithWebIdentity",
            "Condition": {
                "StringEquals": {
                    "accounts.google.com:aud": "<azp-value>",
                    "accounts.google.com:oaud": "<aud-value>",
                    "accounts.google.com:sub": "<sub-value>"

Make sure to replace the <placeholder values> with your values from the Google ID Token. The ID token issued for the service accounts has the azp (AUTHORIZED_PARTY) field set, so condition keys are mapped to the Google ID Token fields as follows:

  • accounts.google.com:oaud condition key matches the aud (AUDIENCE) field on the Google ID token.
  • accounts.google.com:aud condition key matches the azp (AUTHORIZED_PARTY) field on the Google ID token.
  • accounts.google.com:sub condition key matches the sub (SUBJECT) field on the Google ID token.

For more information about the Google aud and azp fields, see the Google Identity Platform OpenID Connect guide.

Authentication flow

The authentication flow for the scenario is shown in Figure 2.

Figure 2: Detailed authentication flow with AssumeRoleWithWebIdentity API

Figure 2: Detailed authentication flow with AssumeRoleWithWebIdentity API

The authentication flow has the following steps:

  1. On AWS, you can source external credentials by configuring the credential_process setting in the config file. For the syntax and operating system requirements, see Source credentials with an external process. For this post, we have created a custom profile TeamA-S3ReadOnlyAccess as follows in the config file:
    [profile TeamA-S3ReadOnlyAccess]
    credential_process = /opt/bin/credentials.sh

    To use different settings, you can create and reference additional profiles.

  2. Specify a program or a script that credential_process will invoke. For this post, credential_process invokes the script /opt/bin/credentials.sh which has the following code. Make sure to replace <111122223333> with your own account ID.
    jwt_token=$(curl -sH "Metadata-Flavor: Google" "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience=${AUDIENCE}&format=full&licenses=FALSE")
    jwt_sub=$(jq -R 'split(".") | .[1] | @base64d | fromjson' <<< "$jwt_token" | jq -r '.sub')
    credentials=$(aws sts assume-role-with-web-identity --role-arn $ROLE_ARN --role-session-name $jwt_sub --web-identity-token $jwt_token | jq '.Credentials' | jq '.Version=1')
    echo $credentials

    The script performs the following steps:

    1. Google generates a new unique instance identity token in the JSON Web Token (JWT) format.
      jwt_token=$(curl -sH "Metadata-Flavor: Google" "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience=${AUDIENCE}&format=full&licenses=FALSE")

      The payload of the token includes several details about the instance and the audience URI, as shown in the following.

         "iss": "[TOKEN_ISSUER]",
         "iat": [ISSUED_TIME],
         "exp": [EXPIRED_TIME],
         "aud": "[AUDIENCE]",
         "sub": "[SUBJECT]",
         "azp": "[AUTHORIZED_PARTY]",
         "google": {
          "compute_engine": {
            "project_id": "[PROJECT_ID]",
            "project_number": [PROJECT_NUMBER],
            "zone": "[ZONE]",
            "instance_id": "[INSTANCE_ID]",
            "instance_name": "[INSTANCE_NAME]",
            "instance_creation_timestamp": [CREATION_TIMESTAMP],
            "instance_confidentiality": [INSTANCE_CONFIDENTIALITY],
            "license_id": [

      The IAM trust policy uses the aud (AUDIENCE), azp (AUTHORIZED_PARTY) and sub (SUBJECT) values from the JWT token to help ensure that the IAM role defined in the section Define an IAM role in AWS can be assumed only by the intended GCP service account.

    2. The script invokes the AssumeRoleWithWebIdentity API call, passing in the identity token from the previous step and specifying which IAM role to assume. The script uses the Identity subject claim as the session name, which can facilitate auditing or forensic operations on this AssumeRoleWithWebIdentity API call. AWS verifies the authenticity of the token before returning temporary credentials. In addition, you can verify the token in your credential program by using the process described at Obtaining the instance identity token.

      The script then returns the temporary credentials to the credential_process as the JSON output on STDOUT; we used jq to parse the output in the desired JSON format.

      jwt_sub=$(jq -R 'split(".") | .[1] | @base64d | fromjson' <<< "$jwt_token" | jq -r '.sub')
      credentials=$(aws sts assume-role-with-web-identity --role-arn $ROLE_ARN --role-session-name $jwt_sub --web-identity-token $jwt_token | jq '.Credentials' | jq '.Version=1')
      echo $credentials

    The following is an example of temporary credentials returned by the credential_process script:

      "Version": 1,
      "AccessKeyId": "AKIAIOSFODNN7EXAMPLE",
      "SecretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
      "SessionToken": "FwoGZXIvYXdzEBUaDOSY+1zJwXi29+/reyLSASRJwSogY/Kx7NomtkCoSJyipWuu6sbDIwFEYtZqg9knuQQyJa9fP68/LCv4jH/efuo1WbMpjh4RZpbVCOQx/zggZTyk2H5sFvpVRUoCO4dc7eqftMhdKtcq67vAUljmcDkC9l0Fei5tJBvVpQ7jzsYeduX/5VM6uReJaSMeOXnIJnQZce6PI3GBiLfaX7Co4o216oS8yLNusTK1rrrwrY2g5e3Zuh1oXp/Q8niFy2FSLN62QHfniDWGO8rCEV9ZnZX0xc4ZN68wBc1N24wKgT+xfCjamcCnBjJYHI2rEtJdkE6bRQc2WAUtccsQk5u83vWae+SpB9ycE/dzfXurqcjCP0urAp4k9aFZFsRIGfLAI1cOABX6CzF30qrcEBnEXAMPLESESSIONTOKEN==",
      "Expiration": "2023-08-31T04:45:30Z"

Note that AWS SDKs store the returned AWS credentials in memory when they call credential_process. AWS SDKs keep track of the credential expiration and generate new AWS session credentials through the credential process. In contrast, the AWS CLI doesn’t cache external process credentials; instead, the AWS CLI calls the credential_process for every CLI request, which creates a new role session and could result in slight delays when you run commands.

Test access in the AWS CLI

After you configure the config file for the credential_process, verify your setup by running the following command.

aws sts get-caller-identity --profile TeamA-S3ReadOnlyAccess

The output will look similar to the following.

   "UserId":"AIDACKCEVSQ6C2EXAMPLE:[Identity subject claim]",
   "Arn":"arn:aws:iam::111122223333:role/RoleForAccessFromGCPTeamA:[Identity subject claim]"

Amazon CloudTrail logs the AssumeRoleWithWebIdentity API call, as shown in Figure 3. The log captures the audience in the identity token as well as the IAM role that is being assumed. It also captures the session name with a reference to the Identity subject claim, which can help simplify auditing or forensic operations on this AssumeRoleWithWebIdentity API call.

Figure 3: CloudTrail event for AssumeRoleWithWebIdentity API call from GCP VM

Figure 3: CloudTrail event for AssumeRoleWithWebIdentity API call from GCP VM

Test access in the AWS SDK

The next step is to test access in the AWS SDK. The following Python program shows how you can refer to the custom profile configured for the credential process.

import boto3

session = boto3.Session(profile_name='TeamA-S3ReadOnlyAccess')
client = session.client('s3')

response = client.list_buckets()
for _bucket in response['Buckets']:

Before you run this program, run pip install boto3. Create an IAM role that has the AmazonS3ReadOnlyAccess policy attached to it. This program prints the names of the existing S3 buckets in your account. For example, if your AWS account has two S3 buckets named DOC-EXAMPLE-BUCKET1 and DOC-EXAMPLE-BUCKET2, then the output of the preceding program shows the following:


If you don’t have an existing S3 bucket, then create an S3 bucket before you run the preceding program.

The list_bucket API call is also logged in CloudTrail, capturing the identity and source of the calling application, as shown in Figure 4.

Figure 4: CloudTrail event for S3 API call made with federated identity session

Figure 4: CloudTrail event for S3 API call made with federated identity session

Clean up

If you don’t need to further use the resources that you created for this walkthrough, delete them to avoid future charges for the deployed resources:

  • Delete the VM instance and service account created in GCP.
  • Delete the resources that you provisioned on AWS to test the solution.


In this post, you learned how to exchange the identity token of a virtual machine running on a GCP compute engine to assume a role on AWS, so that you can seamlessly and securely access AWS resources from GCP hosted workloads.

We walked you through the steps required to set up the credential process and shared best practices to consider in this authentication flow. You can also apply the same pattern to workloads deployed on GCP functions or Google Kubernetes Engine (GKE) when they request access to AWS resources.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Simran Singh

Simran Singh

Simran is a Senior Solutions Architect at AWS. In this role, he assists large enterprise customers in meeting their key business objectives on AWS. His areas of expertise include artificial intelligence/machine learning, security, and improving the experience of developers building on AWS. He has also earned a coveted golden jacket for achieving all currently offered AWS certifications.

Rashmi Iyer

Rashmi Iyer

Rashmi is a Solutions Architect at AWS supporting financial services enterprises. She helps customers build secure, resilient, and scalable architectures on AWS while adhering to architectural best practices. Before joining AWS, Rashmi worked for over a decade to architect and design complex telecom solutions in the packet core domain.

Accelerate analytics on Amazon OpenSearch Service with AWS Glue through its native connector

Post Syndicated from Basheer Sheriff original https://aws.amazon.com/blogs/big-data/accelerate-analytics-on-amazon-opensearch-service-with-aws-glue-through-its-native-connector/

As the volume and complexity of analytics workloads continue to grow, customers are looking for more efficient and cost-effective ways to ingest and analyse data. Data is stored from online systems such as the databases, CRMs, and marketing systems to data stores such as data lakes on Amazon Simple Storage Service (Amazon S3), data warehouses in Amazon Redshift, and purpose-built stores such as Amazon OpenSearch Service, Amazon Neptune, and Amazon Timestream.

OpenSearch Service is used for multiple purposes, such as observability, search analytics, consolidation, cost savings, compliance, and integration. OpenSearch Service also has vector database capabilities that let you implement semantic search and Retrieval Augmented Generation (RAG) with large language models (LLMs) to build recommendation and media search engines. Previously, to integrate with OpenSearch Service, you could use open source clients for specific programming languages such as Java, Python, or JavaScript or use REST APIs provided by OpenSearch Service.

Movement of data across data lakes, data warehouses, and purpose-built stores is achieved by extract, transform, and load (ETL) processes using data integration services such as AWS Glue. AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides both visual and code-based interfaces to make data integration effortless. Using a native AWS Glue connector increases agility, simplifies data movement, and improves data quality.

In this post, we explore the AWS Glue native connector to OpenSearch Service and discover how it eliminates the need to build and maintain custom code or third-party tools to integrate with OpenSearch Service. This accelerates analytics pipelines and search use cases, providing instant access to your data in OpenSearch Service. You can now use data stored in OpenSearch Service indexes as a source or target within the AWS Glue Studio no-code, drag-and-drop visual interface or directly in an AWS Glue ETL job script. When combined with AWS Glue ETL capabilities, this new connector simplifies the creation of ETL pipelines, enabling ETL developers to save time building and maintaining data pipelines.

Solution overview

The new native OpenSearch Service connector is a powerful tool that can help organizations unlock the full potential of their data. It enables you to efficiently read and write data from OpenSearch Service without needing to install or manage OpenSearch Service connector libraries.

In this post, we demonstrate exporting the New York City Taxi and Limousine Commission (TLC) Trip Record Data dataset into OpenSearch Service using the AWS Glue native connector. The following diagram illustrates the solution architecture.

By the end of this post, your visual ETL job will resemble the following screenshot.


To follow along with this post, you need a running OpenSearch Service domain. For setup instructions, refer to Getting started with Amazon OpenSearch Service. Ensure it is public, for simplicity, and note the primary user and password for later use.

Note that as of this writing, the AWS Glue OpenSearch Service connector doesn’t support Amazon OpenSearch Serverless, so you need to set up a provisioned domain.

Create an S3 bucket

We use an AWS CloudFormation template to create an S3 bucket to store the sample data. Complete the following steps:

  1. Choose Launch Stack.
  2. On the Specify stack details page, enter a name for the stack.
  3. Choose Next.
  4. On the Configure stack options page, choose Next.
  5. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Submit.

The stack takes about 2 minutes to deploy.

Create an index in the OpenSearch Service domain

To create an index in the OpenSearch service domain, complete the following steps:

  1. On the OpenSearch Service console, choose Domains in the navigation pane.
  2. Open the domain you created as a prerequisite.
  3. Choose the link under OpenSearch Dashboards URL.
  4. On the navigation menu, choose Dev Tools.
  5. Enter the following code to create the index:
PUT /yellow-taxi-index
  "mappings": {
    "properties": {
      "VendorID": {
        "type": "integer"
      "tpep_pickup_datetime": {
        "type": "date",
        "format": "epoch_millis"
      "tpep_dropoff_datetime": {
        "type": "date",
        "format": "epoch_millis"
      "passenger_count": {
        "type": "integer"
      "trip_distance": {
        "type": "float"
      "RatecodeID": {
        "type": "integer"
      "store_and_fwd_flag": {
        "type": "keyword"
      "PULocationID": {
        "type": "integer"
      "DOLocationID": {
        "type": "integer"
      "payment_type": {
        "type": "integer"
      "fare_amount": {
        "type": "float"
      "extra": {
        "type": "float"
      "mta_tax": {
        "type": "float"
      "tip_amount": {
        "type": "float"
      "tolls_amount": {
        "type": "float"
      "improvement_surcharge": {
        "type": "float"
      "total_amount": {
        "type": "float"
      "congestion_surcharge": {
        "type": "float"
      "airport_fee": {
        "type": "integer"

Create a secret for OpenSearch Service credentials

In this post, we use basic authentication and store our authentication credentials securely using AWS Secrets Manager. Complete the following steps to create a Secrets Manager secret:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. For Key/value pairs, enter the user name opensearch.net.http.auth.user and the password opensearch.net.http.auth.pass.
  5. Choose Next.
  6. Complete the remaining steps to create your secret.

Create an IAM role for the AWS Glue job

Complete the following steps to configure an AWS Identity and Access Management (IAM) role for the AWS Glue job:

  1. On the IAM console, create a new role.
  2. Attach the AWS managed policy GlueServiceRole.
  3. Attach the following policy to the role. Replace each ARN with the corresponding ARN of the OpenSearch Service domain, Secrets Manager secret, and S3 bucket.
    "Version": "2012-10-17",
    "Statement": [
            "Sid": "OpenSearchPolicy",
            "Effect": "Allow",
            "Action": [
            "Resource": [
            "Sid": "GetDescribeSecret",
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:secretsmanager:<region>:<aws-account-id>:secret:<secret-name>"
            "Sid": "S3Policy",
            "Effect": "Allow",
            "Action": [
            "Resource": [

Create an AWS Glue connection

Before you can use the OpenSearch Service connector, you need to create an AWS Glue connection for connecting to OpenSearch Service. Complete the following steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Choose Create connection.
  3. For Name, enter opensearch-connection.
  4. For Connection type, choose Amazon OpenSearch.
  5. For Domain endpoint, enter the domain endpoint of OpenSearch Service.
  6. For Port, enter HTTPS port 443.
  7. For Resource, enter yellow-taxi-index.

In this context, resource means the index of OpenSearch Service where the data is read from or written to.

  1. Select Wan only enabled.
  2. For AWS Secret, choose the secret you created earlier.
  3. Optionally, if you’re connecting to an OpenSearch Service domain in a VPC, specify a VPC, subnet, and security group to run AWS Glue jobs inside the VPC. For security groups, a self-referencing inbound rule is required. For more information, see Setting up networking for development for AWS Glue.
  4. Choose Create connection.

Create an ETL job using AWS Glue Studio

Complete the following steps to create your AWS Glue ETL job:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. Choose Create job and Visual ETL.
  3. On the AWS Glue Studio console, change the job name to opensearch-etl.
  4. Choose Amazon S3 for the data source and Amazon OpenSearch for the data target.

Between the source and target, you can optionally insert transform nodes. In this solution, we create a job that has only source and target nodes for simplicity.

  1. In the Data source properties section, specify the S3 bucket where the sample data is located, and choose Parquet as the data format.
  2. In the Data sink properties section, specify the connection you created in the previous section (opensearch-connection).
  3. Choose the Job details tab, and in the Basic properties section, specify the IAM role you created earlier.
  4. Choose Save to save your job, and choose Run to run the job.
  5. Navigate to the Runs tab to check the status of the job. When it is successful, the run status should be Succeeded.
  6. After the job runs successfully, navigate to OpenSearch Dashboards, and log in to the dashboard.
  7. Choose Dashboards Management on the navigation menu.
  8. Choose Index patterns, and choose Create index pattern.
  9. Enter yellow-taxi-index for Index pattern name.
  10. Choose tpep_pickup_datetime for Time.
  11. Choose Create index pattern. This index pattern will be used to visualize the index.
  12. Choose Discover on the navigation menu, and choose yellow-taxi-index.

You have now created an index in OpenSearch Service and loaded data into it from Amazon S3 in just a few steps using the AWS Glue OpenSearch Service native connector.

Clean up

To avoid incurring charges, clean up the resources in your AWS account by completing the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. From the list of jobs, select the job opensearch-etl, and on the Actions menu, choose Delete.
  3. On the AWS Glue console, choose Data connections in the navigation pane.
  4. Select opensearch-connection from the list of connectors, and on the Actions menu, choose Delete.
  5. On the IAM console, choose Roles in the navigation page.
  6. Select the role you created for the AWS Glue job and delete it.
  7. On the CloudFormation console, choose Stacks in the navigation pane.
  8. Select the stack you created for the S3 bucket and sample data and delete it.
  9. On the Secrets Manager console, choose Secrets in the navigation pane.
  10. Select the secret you created, and on the Actions menu, choose Delete.
  11. Reduce the waiting period to 7 days and schedule the deletion.


The integration of AWS Glue with OpenSearch Service adds the powerful ability to perform data transformation when integrating with OpenSearch Service for analytics use cases. This enables organizations to streamline data integration and analytics with OpenSearch Service. The serverless nature of AWS Glue means no infrastructure management, and you pay only for the resources consumed while your jobs are running. As organizations increasingly rely on data for decision-making, this native Spark connector provides an efficient, cost-effective, and agile solution to swiftly meet data analytics needs.

About the authors

Basheer Sheriff is a Senior Solutions Architect at AWS. He loves to help customers solve interesting problems leveraging new technology. He is based in Melbourne, Australia, and likes to play sports such as football and cricket.

Shunsuke Goto is a Prototyping Engineer working at AWS. He works closely with customers to build their prototypes and also helps customers build analytics systems.

How to implement client certificate revocation list checks at scale with API Gateway

Post Syndicated from Arthur Mnev original https://aws.amazon.com/blogs/security/how-to-implement-client-certificate-revocation-list-checks-at-scale-with-api-gateway/

ityAs you design your Amazon API Gateway applications to rely on mutual certificate authentication (mTLS), you need to consider how your application will verify the revocation status of a client certificate. In your design, you should account for the performance and availability of your verification mechanism to make sure that your application endpoints perform reliably.

In this blog post, I demonstrate an architecture that will help you on your journey to implement custom revocation checks against your certificate revocation list (CRL) for API Gateway. You will also learn advanced Amazon Simple Storage Service (Amazon S3) and AWS Lambda techniques to achieve higher performance and scalability.

Choosing the right certificate verification method

One of your first considerations is whether to use a CRL or the Online Certificate Status Protocol (OCSP), if your certificate authority (CA) offers this option. For an in-depth analysis of these two options, see my earlier blog post, Choosing the right certificate revocation method in ACM Private CA. In that post, I demonstrated that OCSP is a good choice when your application can tolerate high latency or a failure for certificate verification due to TLS service-to-OCSP connectivity. When you rely on mutual TLS authentication in a high-rate transactional environment, increased latency or OCSP reachability failures may affect your application. We strongly recommend that you validate the revocation status of your mutual TLS certificates. Verifying your client certificate status against the CRL is the correct approach for certificate verification if you require reliability and lower, predictable latency. A potential exception to this approach is the use case of AWS Certificate Manager Private Certificate Authority (AWS Private CA) with an OCSP responder hosted on AWS CloudFront.

With an AWS Private CA OCSP responder hosted on CloudFront, you can reduce the risks of network and latency challenges by relying on communication between AWS native services. While this post focuses on the solution that targets CRLs originating from any CA, if you use AWS Private CA with an OCSP responder, you should consider generating an OCSP request in your Lambda authorizer.

Mutual authentication with API Gateway

API Gateway mutual TLS authentication (mTLS) requires you to define a root of trust that will contain your certificate authority public key. During the mutual TLS authentication process, API Gateway performs the undifferentiated heavy lifting by offloading the certificate authentication and negotiation process. During the authentication process, API Gateway validates that your certificate is trusted, has valid dates, and uses a supported algorithm. Additionally, you can refer to the API Gateway documentation and related blog post for details about the mutual TLS authentication process on API Gateway.

Implementing mTLS certificate verification for API Gateway

In the remainder of this blog post, I’ll describe the architecture for a scalable implementation of a client certificate verification mechanism against a CRL on your API Gateway.

The certificate CRL verification process presented here relies on a custom Lambda authorizer that validates the certificate revocation status against the CRL. The Lambda authorizer caches CRL data to optimize the query time for subsequent requests and allows you to define custom business logic that could go beyond CRL verification. For example, you could include other, just-in-time authorization decisions as a part of your evaluation logic.

Implementation mechanisms

This section describes the implementation mechanisms that help you create a high-performing extension to the API Gateway mutual TLS authentication process.

Data repository for your certificate revocation list

API Gateway mutual TLS configuration uses Amazon S3 as a repository for your root of trust. The design for this sample implementation extends the use of S3 buckets to store your CRL and the public key for the certificate authority that signed the CRL.

We strongly recommend that you maintain an updated CRL and verify its signature before data processing. This process is automatic if you use AWS Private CA, because AWS Private CA will update your CRL automatically on revocation. AWS Private CA also allows you to retrieve the CA’s public key by using an API call.

Certificate validation

My sample implementation architecture uses the API Gateway Lambda authorizer to validate the serial number of the client certificate used in the mutual TLS authentication session against the list of serial numbers present in the CRL you publish to the S3 bucket. In the process, the API Gateway custom authorizer will read the client certificate serial number, read and validate the CRL’s digital signature, search for the client’s certificate serial number within the CRL, and return the authorization policy based on the findings.

Optimizing for performance

The mechanisms that enable a predictable, low-latency performance are CRL preprocessing and caching. Your CRL is an ASN.1 data structure that requires a relatively high computing time for processing. Preprocessing your CRL into a simple-to-parse data structure reduces the computational cost you would otherwise incur for every validation; caching the CRL will help you reduce the validation latency and improve predictability further.

Performance optimizations

The process of parsing and validating CRLs is computationally expensive. In the case of large CRL files, parsing the CRL in the Lambda authorizer on every request can result in high latency and timeouts. To improve latency and reduce compute costs, this solution optimizes for performance by preprocessing the CRL and implementing function-level caching.

Preprocessing and generation of a cached CRL file

The first optimization happens when S3 receives a new CRL object. As shown in Figure 1, the S3 PutObject event invokes a preprocessing Lambda that validates the signature of your uploaded CRL and decodes its ASN.1 format. The output of the preprocessing Lambda function is the list of the revoked certificate serial numbers from the CRL, in a data structure that is simpler to read by your programming language of choice, and that won’t require extensive parsing by your Lambda authorizer. The asynchronous approach mitigates the impact of CRL processing on your API Gateway workload.

Figure 1: Sample implementation flow of the pre-processing component

Figure 1: Sample implementation flow of the pre-processing component

Client certificate lookup in a CRL

The optimization happens as part of your Lambda authorizer that retrieves the preprocessed CRL data generated from the first step and searches through the data structure for your client certificate serial number. If the Lambda authorizer finds your client’s certificate serial number in the CRL, the authorization request fails, and the Lambda authorizer generates a “Deny” policy. Searching through a read-optimized data structure prepared by your preprocessing step is the second optimization that reduces the lookup time and the compute requirements.

Function-level caching

Because of the preprocessing, the Lambda authorizer code no longer needs to perform the expensive operation of decoding the ASN.1 data structures of the original CRL; however, network transfer latency will remain and may impact your application.

To improve performance, and as a third optimization, the Lambda service retains the runtime environment for a recently-run function for a non-deterministic period of time. If the function is invoked again during this time period, the Lambda function doesn’t have to initialize and can start running immediately. This is called a warm start. Function-level caching takes advantage of this warm start to hold the CRL data structure in memory persistently between function invocations so the Lambda function doesn’t have to download the preprocessed CRL data structure from S3 on every request.

The duration of the Lambda container’s warm state depends on multiple factors, such as usage patterns and parallel requests processed by your function. If, in your case, API use is infrequent or its usage pattern is spiky, pre-provisioned concurrency is another technique that can further reduce your Lambda startup times and the duration of your warm cache. Although provisioned concurrency does have additional costs, I recommend you evaluate its benefits for your specific environment. You can also check out the blog dedicated to this topic, Scheduling AWS Lambda Provisioned Concurrency for recurring peak usage.

To validate that the Lambda authorizer has the latest copy of the CRL data structure, the S3 ETag value is used to determine if the object has changed. The preprocessed CRL object’s ETag value is stored as a Lambda global variable, so its value is retained between invocations in the same runtime environment. When API Gateway invokes the Lambda authorizer, the function checks for existing global preprocessed CRL data structure and ETag variables. The process will only retrieve a read-optimized CRL when the ETag is absent, or its value differs from the ETag of the preprocessed CRL object in S3.

Figure 2 demonstrates this process flow.

Figure 2: Sample implementation flow for the Lambda authorizer component

Figure 2: Sample implementation flow for the Lambda authorizer component

In summary, you will have a Lambda container with a persistent in-memory lookup data structure for your CRL by doing the following:

  • Asynchronously start your preprocessing workflow by using the S3 PutObject event so you can generate and store your preprocessed CRL data structure in a separate S3 object.
  • Read the preprocessed CRL from S3 and its ETag value and store both values in global variables.
  • Compare the value of the ETag stored in your global variables to the current ETag value of the preprocessed CRL S3 object, to reduce unnecessary downloads if the current ETag value of your S3 object is the same as the previous value.
  • We recommend that you avoid using built-in API Gateway Lambda authorizer result caching, because the status of your certificate might change, and your authorization decision would rest on out-of-date verification results.
  • Consider setting a reserved concurrency for your CRL verification function so that API Gateway can invoke your function even if the overall capacity for your account in your AWS Region is exhausted.

The sample implementation flow diagram in Figure 3 demonstrates the overall architecture of the solution.

Figure 3: Sample implementation flow for the overall CRL verification architecture

Figure 3: Sample implementation flow for the overall CRL verification architecture

The workflow for the solution overall is as follows:

  1. An administrator publishes a CRL and its signing CA’s certificate to their non-public S3 bucket, which is accessible by the Lambda authorizer and preprocessor roles.
  2. An S3 event invokes the Lambda preprocessor to run upon CRL upload. The function retrieves the CRL from S3, validates its signature against the issuing certificate, and parses the CRL.
  3. The preprocessor Lambda stores the results in an S3 bucket with a name in the form <crlname>.cache.json.
  4. A TLS client requests an mTLS connection and supplies its certificate.
  5. API Gateway completes mTLS negotiation and invokes the Lambda authorizer.
  6. The Lambda authorizer function parses the client’s mTLS certificate, retrieves the cached CRL object, and searches the object for the serial number of the client’s certificate.
  7. The authorizer function returns a deny policy if the certificate is revoked or in error.
  8. API Gateway, if authorized, proceeds with the integrated function or denies the client’s request.


In this post, I presented a design for validating your API Gateway mutual TLS client certificates against a CRL, with support for extra-large certificate revocation files. This approach will help you align with the best security practices for validating client certificates and use advanced S3 access and Lambda caching techniques to minimize time and latency for validation.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Security, Identity, and Compliance re:Post or contact AWS Support.

Arthur Mnev

Arthur is a Senior Specialist Security Architect for AWS Industries. He spends his day working with customers and designing innovative approaches to help customers move forward with their initiatives, improve their security posture, and reduce security risks in their cloud journeys. Outside of work, Arthur enjoys being a father, skiing, scuba diving, and Krav Maga.

Rafael Cassolato de Meneses

Rafael Cassolato de Meneses

Rafael Cassolato is a Solutions Architect with 20+ years in IT, holding bachelor’s and master’s degrees in Computer Science and 10 AWS certifications. Specializing in migration and modernization, Rafael helps strategic AWS customers achieve their business goals and solve technical challenges by leveraging AWS’s cloud platform.

Build efficient ETL pipelines with AWS Step Functions distributed map and redrive feature

Post Syndicated from Sriharsh Adari original https://aws.amazon.com/blogs/big-data/build-efficient-etl-pipelines-with-aws-step-functions-distributed-map-and-redrive-feature/

AWS Step Functions is a fully managed visual workflow service that enables you to build complex data processing pipelines involving a diverse set of extract, transform, and load (ETL) technologies such as AWS Glue, Amazon EMR, and Amazon Redshift. You can visually build the workflow by wiring individual data pipeline tasks and configuring payloads, retries, and error handling with minimal code.

While Step Functions supports automatic retries and error handling when data pipeline tasks fail due to momentary or transient errors, there can be permanent failures such as incorrect permissions, invalid data, and business logic failure during the pipeline run. This requires you to identify the issue in the step, fix the issue and restart the workflow. Previously, to rerun the failed step, you needed to restart the entire workflow from the very beginning. This leads to delays in completing the workflow, especially if it’s a complex, long-running ETL pipeline. If the pipeline has many steps using map and parallel states, this also leads to increased cost due to increases in the state transition for running the pipeline from the beginning.

Step Functions now supports the ability for you to redrive your workflow from a failed, aborted, or timed-out state so you can complete workflows faster and at a lower cost, and spend more time delivering business value. Now you can recover from unhandled failures faster by redriving failed workflow runs, after downstream issues are resolved, using the same input provided to the failed state.

In this post, we show you an ETL pipeline job that exports data from Amazon Relational Database Service (Amazon RDS) tables using the Step Functions distributed map state. Then we simulate a failure and demonstrate how to use the new redrive feature to restart the failed task from the point of failure.

Solution overview

One of the common functionalities involved in data pipelines is extracting data from multiple data sources and exporting it to a data lake or synchronizing the data to another database. You can use the Step Functions distributed map state to run hundreds of such export or synchronization jobs in parallel. Distributed map can read millions of objects from Amazon Simple Storage Service (Amazon S3) or millions of records from a single S3 object, and distribute the records to downstream steps. Step Functions runs the steps within the distributed map as child workflows at a maximum parallelism of 10,000. A concurrency of 10,000 is well above the concurrency supported by many other AWS services such as AWS Glue, which has a soft limit of 1,000 job runs per job.

The sample data pipeline sources product catalog data from Amazon DynamoDB and customer order data from Amazon RDS for PostgreSQL database. The data is then cleansed, transformed, and uploaded to Amazon S3 for further processing. The data pipeline starts with an AWS Glue crawler to create the Data Catalog for the RDS database. Because starting an AWS Glue crawler is asynchronous, the pipeline has a wait loop to check if the crawler is complete. After the AWS Glue crawler is complete, the pipeline extracts data from the DynamoDB table and RDS tables. Because these two steps are independent, they are run as parallel steps: one using an AWS Lambda function to export, transform, and load the data from DynamoDB to an S3 bucket, and the other using a distributed map with AWS Glue job sync integration to do the same from the RDS tables to an S3 bucket. Note that AWS Identity and Access Management (IAM) permissions are required for invoking an AWS Glue job from Step Functions. For more information, refer to IAM Policies for invoking AWS Glue job from Step Functions.

The following diagram illustrates the Step Functions workflow.

There are multiple tables related to customers and order data in the RDS database. Amazon S3 hosts the metadata of all the tables as a .csv file. The pipeline uses the Step Functions distributed map to read the table metadata from Amazon S3, iterate on every single item, and call the downstream AWS Glue job in parallel to export the data. See the following code:

"States": {
            "Map": {
              "Type": "Map",
              "ItemProcessor": {
                "ProcessorConfig": {
                  "Mode": "DISTRIBUTED",
                  "ExecutionType": "STANDARD"
                "StartAt": "Export data for a table",
                "States": {
                  "Export data for a table": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::glue:startJobRun.sync",
                    "Parameters": {
                      "JobName": "ExportTableData",
                      "Arguments": {
                        "--dbtable.$": "$.tables"
                    "End": true
              "Label": "Map",
              "ItemReader": {
                "Resource": "arn:aws:states:::s3:getObject",
                "ReaderConfig": {
                  "InputType": "CSV",
                  "CSVHeaderLocation": "FIRST_ROW"
                "Parameters": {
                  "Bucket": "123456789012-stepfunction-redrive",
                  "Key": "tables.csv"
              "ResultPath": null,
              "End": true


To deploy the solution, you need the following prerequisites:

Launch the CloudFormation template

Complete the following steps to deploy the solution resources using AWS CloudFormation:

  1. Choose Launch Stack to launch the CloudFormation stack:
  2. Enter a stack name.
  3. Select all the check boxes under Capabilities and transforms.
  4. Choose Create stack.

The CloudFormation template creates many resources, including the following:

  • The data pipeline described earlier as a Step Functions workflow
  • An S3 bucket to store the exported data and the metadata of the tables in Amazon RDS
  • A product catalog table in DynamoDB
  • An RDS for PostgreSQL database instance with pre-loaded tables
  • An AWS Glue crawler that crawls the RDS table and creates an AWS Glue Data Catalog
  • A parameterized AWS Glue job to export data from the RDS table to an S3 bucket
  • A Lambda function to export data from DynamoDB to an S3 bucket

Simulate the failure

Complete the following steps to test the solution:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose the workflow named ETL_Process.
  3. Run the workflow with default input.

Within a few seconds, the workflow fails at the distributed map state.

You can inspect the map run errors by accessing the Step Functions workflow execution events for map runs and child workflows. In this example, you can identity the exception is due to Glue.ConcurrentRunsExceededException from AWS Glue. The error indicates there are more concurrent requests to run an AWS Glue job than are configured. Distributed map reads the table metadata from Amazon S3 and invokes as many AWS Glue jobs as the number of rows in the .csv file, but AWS Glue job is set with the concurrency of 3 when it is created. This resulted in the child workflow failure, cascading the failure to the distributed map state and then the parallel state. The other step in the parallel state to fetch the DynamoDB table ran successfully. If any step in the parallel state fails, the whole state fails, as seen with the cascading failure.

Handle failures with distributed map

By default, when a state reports an error, Step Functions causes the workflow to fail. There are multiple ways you can handle this failure with distributed map state:

  • Step Functions enables you to catch errors, retry errors, and fail back to another state to handle errors gracefully. See the following code:
    Retry": [
                            "ErrorEquals": [
                              "Glue.ConcurrentRunsExceededException "
                            "BackoffRate": 20,
                            "IntervalSeconds": 10,
                            "MaxAttempts": 3,
                            "Comment": "Exception",
                            "JitterStrategy": "FULL"

  • Sometimes, businesses can tolerate failures. This is especially true when you are processing millions of items and you expect data quality issues in the dataset. By default, when an iteration of map state fails, all other iterations are aborted. With distributed map, you can specify the maximum number of, or percentage of, failed items as a failure threshold. If the failure is within the tolerable level, the distributed map doesn’t fail.
  • The distributed map state allows you to control the concurrency of the child workflows. You can set the concurrency to map it to the AWS Glue job concurrency. Remember, this concurrency is applicable only at the workflow execution level—not across workflow executions.
  • You can redrive the failed state from the point of failure after fixing the root cause of the error.

Redrive the failed state

The root cause of the issue in the sample solution is the AWS Glue job concurrency. To address this by redriving the failed state, complete the following steps:

  1. On the AWS Glue console, navigate to the job named ExportsTableData.
  2. On the Job details tab, under Advanced properties, update Maximum concurrency to 5.

With the launch of redrive feature, You can use redrive to restart executions of standard workflows that didn’t complete successfully in the last 14 days. These include failed, aborted, or timed-out runs. You can only redrive a failed workflow from the step where it failed using the same input as the last non-successful state. You can’t redrive a failed workflow using a state machine definition that is different from the initial workflow execution. After the failed state is redriven successfully, Step Functions runs all the downstream tasks automatically. To learn more about how distributed map redrive works, refer to Redriving Map Runs.

Because the distributed map runs the steps inside the map as child workflows, the workflow IAM execution role needs permission to redrive the map run to restart the distributed map state:

  "Version": "2012-10-17",
  "Statement": [
      "Effect": "Allow",
      "Action": [
      "Resource": "arn:aws:states:us-east-2:123456789012:execution:myStateMachine/myMapRunLabel:*"

You can redrive a workflow from its failed step programmatically, via the AWS Command Line Interface (AWS CLI) or AWS SDK, or using the Step Functions console, which provides a visual operator experience.

  1. On the Step Functions console, navigate to the failed workflow you want to redrive.
  2. On the Details tab, choose Redrive from failure.

The pipeline now runs successfully because there is enough concurrency to run the AWS Glue jobs.

To redrive a workflow programmatically from its point of failure, call the new Redrive Execution API action. The same workflow starts from the last non-successful state and uses the same input as the last non-successful state from the initial failed workflow. The state to redrive from the workflow definition and the previous input are immutable.

Note the following regarding different types of child workflows:

  • Redrive for express child workflows – For failed child workflows that are express workflows within a distributed map, the redrive capability ensures a seamless restart from the beginning of the child workflow. This allows you to resolve issues that are specific to individual iterations without restarting the entire map.
  • Redrive for standard child workflows – For failed child workflows within a distributed map that are standard workflows, the redrive feature functions the same way as with standalone standard workflows. You can restart the failed state within each map iteration from its point of failure, skipping unnecessary steps that have already successfully run.

You can use Step Functions status change notifications with Amazon EventBridge for failure notifications such as sending an email on failure.

Clean up

To clean up your resources, delete the CloudFormation stack via the AWS CloudFormation console.


In this post, we showed you how to use the Step Functions redrive feature to redrive a failed step within a distributed map by restarting the failed step from the point of failure. The distributed map state allows you to write workflows that coordinate large-scale parallel workloads within your serverless applications. Step Functions runs the steps within the distributed map as child workflows at a maximum parallelism of 10,000, which is well above the concurrency supported by many AWS services.

To learn more about distributed map, refer to Step Functions – Distributed Map. To learn more about redriving workflows, refer to Redriving executions.

About the Authors

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing Tennis.

Joe Morotti is a Senior Solutions Architect at Amazon Web Services (AWS), working with Enterprise customers across the Midwest US to develop innovative solutions on AWS. He has held a wide range of technical roles and enjoys showing customers the art of the possible. He has attained seven AWS certification and has a passion for AI/ML and the contact center space. In his free time, he enjoys spending quality time with his family exploring new places and overanalyzing his sports team’s performance.

Uma Ramadoss is a specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping customers design and operate event-driven cloud-native applications and modern business workflows using services like Lambda, EventBridge, Step Functions, and Amazon MWAA.

Automatically detect Personally Identifiable Information in Amazon Redshift using AWS Glue

Post Syndicated from Manikanta Gona original https://aws.amazon.com/blogs/big-data/automatically-detect-personally-identifiable-information-in-amazon-redshift-using-aws-glue/

With the exponential growth of data, companies are handling huge volumes and a wide variety of data including personally identifiable information (PII). PII is a legal term pertaining to information that can identify, contact, or locate a single person. Identifying and protecting sensitive data at scale has become increasingly complex, expensive, and time-consuming. Organizations have to adhere to data privacy, compliance, and regulatory requirements such as GDPR and CCPA, and it’s important to identify and protect PII to maintain compliance. You need to identify sensitive data, including PII such as name, Social Security Number (SSN), address, email, driver’s license, and more. Even after identification, it’s cumbersome to implement redaction, masking, or encryption of sensitive data at scale.

Many companies identify and label PII through manual, time-consuming, and error-prone reviews of their databases, data warehouses and data lakes, thereby rendering their sensitive data unprotected and vulnerable to regulatory penalties and breach incidents.

In this post, we provide an automated solution to detect PII data in Amazon Redshift using AWS Glue.

Solution overview

With this solution, we detect PII in data on our Redshift data warehouse so that the we take and protect the data. We use the following services:

  • Amazon Redshift is a cloud data warehousing service that uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes, using AWS-designed hardware and machine learning (ML) to deliver the best price/performance at any scale. For our solution, we use Amazon Redshift to store the data.
  • AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, and combine data for analytics, ML, and application development. We use AWS Glue to discover the PII data that is stored in Amazon Redshift.
  • Amazon Simple Storage Services (Amazon S3) is a storage service offering industry-leading scalability, data availability, security, and performance.

The following diagram illustrates our solution architecture.

The solution includes the following high-level steps:

  1. Set up the infrastructure using an AWS CloudFormation template.
  2. Load data from Amazon S3 to the Redshift data warehouse.
  3. Run an AWS Glue crawler to populate the AWS Glue Data Catalog with tables.
  4. Run an AWS Glue job to detect the PII data.
  5. Analyze the output using Amazon CloudWatch.


The resources created in this post assume that a VPC is in place along with a private subnet and both their identifiers. This ensures that we don’t substantially change the VPC and subnet configuration. Therefore, we want to set up our VPC endpoints based on the VPC and subnet we choose to expose it in.

Before you get started, create the following resources as prerequisites:

  • An existing VPC
  • A private subnet in that VPC
  • A VPC gateway S3 endpoint
  • A VPC STS gateway endpoint

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with a CloudFormation template, complete the following steps:

  1. Open the AWS CloudFormation console in your AWS account.
  2. Choose Launch Stack:
  3. Choose Next.
  4. Provide the following information:
    1. Stack name
    2. Amazon Redshift user name
    3. Amazon Redshift password
    4. VPC ID
    5. Subnet ID
    6. Availability Zones for the subnet ID
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.
  9. Note the values for S3BucketName and RedshiftRoleArn on the stack’s Outputs tab.

Load data from Amazon S3 to the Redshift Data warehouse

With the COPY command, we can load data from files located in one or more S3 buckets. We use the FROM clause to indicate how the COPY command locates the files in Amazon S3. You can provide the object path to the data files as part of the FROM clause, or you can provide the location of a manifest file that contains a list of S3 object paths. COPY from Amazon S3 uses an HTTPS connection.

For this post, we use a sample personal health dataset. Load the data with the following steps:

  1. On the Amazon S3 console, navigate to the S3 bucket created from the CloudFormation template and check the dataset.
  2. Connect to the Redshift data warehouse using the Query Editor v2 by establishing a connection with the database you creating using the CloudFormation stack along with the user name and password.

After you’re connected, you can use the following commands to create the table in the Redshift data warehouse and copy the data.

  1. Create a table with the following query:
    CREATE TABLE personal_health_identifiable_information (
        mpi char (10),
        firstName VARCHAR (30),
        lastName VARCHAR (30),
        email VARCHAR (75),
        gender CHAR (10),
        mobileNumber VARCHAR(20),
        clinicId VARCHAR(10),
        creditCardNumber VARCHAR(50),
        driverLicenseNumber VARCHAR(40),
        patientJobTitle VARCHAR(100),
        ssn VARCHAR(15),
        geo VARCHAR(250),
        mbi VARCHAR(50)    

  2. Load the data from the S3 bucket:
    COPY personal_health_identifiable_information
    FROM 's3://<S3BucketName>/personal_health_identifiable_information.csv'
    IAM_ROLE '<RedshiftRoleArn>'
    delimiter ','
    region '<aws region>'

Provide values for the following placeholders:

  • RedshiftRoleArn – Locate the ARN on the CloudFormation stack’s Outputs tab
  • S3BucketName – Replace with the bucket name from the CloudFormation stack
  • aws region – Change to the Region where you deployed the CloudFormation template
  1. To verify the data was loaded, run the following command:
    SELECT * FROM personal_health_identifiable_information LIMIT 10;

Run an AWS Glue crawler to populate the Data Catalog with tables

On the AWS Glue console, select the crawler that you deployed as part of the CloudFormation stack with the name crawler_pii_db, then choose Run crawler.

When the crawler is complete, the tables in the database with the name pii_db are populated in the AWS Glue Data Catalog, and the table schema looks like the following screenshot.

Run an AWS Glue job to detect PII data and mask the corresponding columns in Amazon Redshift

On the AWS Glue console, choose ETL Jobs in the navigation pane and locate the detect-pii-data job to understand its configuration. The basic and advanced properties are configured using the CloudFormation template.

The basic properties are as follows:

  • Type – Spark
  • Glue version – Glue 4.0
  • Language – Python

For demonstration purposes, the job bookmarks option is disabled, along with the auto scaling feature.

We also configure advanced properties regarding connections and job parameters.
To access data residing in Amazon Redshift, we created an AWS Glue connection that utilizes the JDBC connection.

We also provide custom parameters as key-value pairs. For this post, we sectionalize the PII into five different detection categories:

  • custom – Coordinates

If you’re trying this solution from other countries, you can specify the custom PII fields using the custom category, because this solution is created based on US regions.

For demonstration purposes, we use a single table and pass it as the following parameter:

--table_name: table_name

For this post, we name the table personal_health_identifiable_information.

You can customize these parameters based on the individual business use case.

Run the job and wait for the Success status.

The job has two goals. The first goal is to identify PII data-related columns in the Redshift table and produce a list of these column names. The second goal is the obfuscation of data in those specific columns of the target table. As a part of the second goal, it reads the table data, applies a user-defined masking function to those specific columns, and updates the data in the target table using a Redshift staging table (stage_personal_health_identifiable_information) for the upserts.

Alternatively, you can also use dynamic data masking (DDM) in Amazon Redshift to protect sensitive data in your data warehouse.

Analyze the output using CloudWatch

When the job is complete, let’s review the CloudWatch logs to understand how the AWS Glue job ran. We can navigate to the CloudWatch logs by choosing Output logs on the job details page on the AWS Glue console.

The job identified every column that contains PII data, including custom fields passed using the AWS Glue job sensitive data detection fields.

Clean up

To clean up the infrastructure and avoid additional charges, complete the following steps:

  1. Empty the S3 buckets.
  2. Delete the endpoints you created.
  3. Delete the CloudFormation stack via the AWS CloudFormation console to delete the remaining resources.


With this solution, you can automatically scan the data located in Redshift clusters using an AWS Glue job, identify PII, and take necessary actions. This could help your organization with security, compliance, governance, and data protection features, which contribute towards the data security and data governance.

About the Authors

Manikanta Gona is a Data and ML Engineer at AWS Professional Services. He joined AWS in 2021 with 6+ years of experience in IT. At AWS, he is focused on Data Lake implementations, and Search, Analytical workloads using Amazon OpenSearch Service. In his spare time, he love to garden, and go on hikes and biking with his husband.

Denys Novikov is a Senior Data Lake Architect with the Professional Services team at Amazon Web Services. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems for Enterprise customers.

Anjan Mukherjee is a Data Lake Architect at AWS, specializing in big data and analytics solutions. He helps customers build scalable, reliable, secure and high-performance applications on the AWS platform.