Tag Archives: Expert (400)

Deep dive on Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/deep-dive-on-amazon-msk-tiered-storage/

In the first post of the series, we described some core concepts of Apache Kafka cluster sizing, the best practices for optimizing the performance, and the cost of your Kafka workload.

This post explains how the underlying infrastructure affects Kafka performance when you use Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage. We delve deep into the core components of Amazon MSK tiered storage and address questions such as: How does read and write work in a tiered storage-enabled cluster?

In the subsequent post, we’ll discuss the latency impact, recommended metrics to monitor, and conclude with guidance on key considerations in a production tiered storage-enabled cluster.

How Amazon MSK tiered storage works

To understand the internal architecture of Amazon MSK tiered storage, let’s first discuss some fundamentals of Kafka topics, partitions, and how read and write works.

A logical stream of data in Kafka is referred to as a topic. A topic is broken down into partitions, which are physical entities used to distribute load across multiple server instances (brokers) that serve reads and writes.

A partition––also designated as topic-partition as it’s relative to a given topic––can be replicated, which means there are several copies of the data in the group of brokers forming a cluster. Each copy is called a replica or a log. One of these replicas, called the leader, serves as the reference. It’s where the ingress traffic is accepted for the topic-partition.

A log is an append-only sequence of log segments. Log segments contain Kafka data records, which are added to the end of the log or the active segment.

Log segments are stored as regular files. On the file system, Kafka identifies the file of a log segment by putting the offset of the first data record it contains in its file name. The offset of a record is simply a monotonic index assigned to a record by Kafka when it’s appended to the log. The segment files of a log are stored in a directory dedicated to the associated topic-partition.

When Kafka reads data from an arbitrary offset, it first looks up the segment that contains that offset from the segment file name, then the specific record location inside that file using an offset index. Offset indexes are materialized on a dedicated file stored with segment files in the topic-partition directory. There is also timeindex to seek by timestamp.

For every partition, Kafka also stores a journal of leadership changes in a file called leader-epoch-checkpoint. This file contains mapping of leader epoch to startOffset of the epoch. Whenever a new leader is elected for a partition by the Kafka controller, this data is updated and propagated to all brokers. A leader epoch is a 32-bit, monotonically increasing number representing continuous period of leadership of a single partition. It’s marked on all the Kafka records. The following code is the local storage layout of topic cars and partition 0 containing two segments (0, 35):

$ ls /kafka-cluster/broker-1/data/cars-0/


Kafka manages the lifecycle of these segment files. It creates a new one when a new segment needs to be created, for instance, if the current segment reaches its configured max size. It deletes one when the target retention period of the data it contains is reached, or the total maximal size of the log is reached. Data is deleted from the tail of the logs and corresponds to the oldest data of the append-only log of the topic-partition.

KIP-405 or tiered storage in Apache Kafka

The ability to tier data, in other words, transfer data (log, index, timeindex, and leader-epoch-checkpoint) from a local file system to another storage system based on time and size-based retention policies, is a feature built in Apache Kafka as part of KIP-405.

The KIP-405 isn’t in official Kafka version yet. Amazon MSK internally implemented tiered storage functionality on top of official Kafka version 2.8.2. Amazon MSK exposes this functionality on AWS specific 2.8.2.tiered Kafka version. With this feature, you can separate retention settings for local and remote retention. Data in the local tier is retained until the data gets copied to the remote tier even after the local retention expires. Data in the remote tier is retained until the remote retention expires. KIP-405 proposes a pluggable architecture allowing you to plugin custom remote storage and metadata storage backends. The following diagram illustrates the broker three key components.

The components are as follows:

  • RemoteLogManager (RLM) – A new component corresponding to LogManager for the local tier. It delegates copy, fetch, and delete of completed and non-active partition segments to a pluggable RemoteStorageManager implementation and maintains respective remote log segment metadata through pluggable RemoteLogMetadataManager implementation.
  • RemoteStorageManager (RSM) – A pluggable interface that provides the lifecycle of remote log segments.
  • RemoteLogMetadataManager (RLMM) – A pluggable interface that provides the lifecycle of metadata about remote log segments.

How data is moved to the remote tier for a tiered storage-enabled topic

In a tiered storage-enabled topic, each completed segment for a topic-partition triggers the leader of the partition to copy the data to the remote storage tier. The completed log segment is removed from the local disks when Amazon MSK finishes moving that log segment to the remote tier and after it meets the local retention policy. This frees up local storage space.

Let’s consider a hypothetical scenario: you have a topic with one partition. Prior to enabling tiered storage for this topic, there are three log segments. One of the segments is active and receiving data, and the other two segments are complete.

After you enable tiered storage for this topic with two days of local retention and five days of overall retention, Amazon MSK copies log segment 1 and 2 to tiered storage. Amazon MSK also retains the primary storage copy of segments 1 and 2. The active segment 3 isn’t eligible to copy over to tiered storage yet. In this timeline, none of the retention settings are applied yet for any of the messages in segment 1 and segment 2.

After 2 days, the primary retention settings take effect for the segment 1 and segment 2 that Amazon MSK copied to the tiered storage. Segments 1 and 2 now expire from the local storage. Active segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s an active segment.

After 5 days, overall retention settings take effect, and Amazon MSK clears log segments 1 and 2 from tiered storage. Segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s active.

That’s how the data lifecycle works on a tiered storage-enabled cluster.

Amazon MSK immediately starts moving data to tiered storage as soon as a segment is closed. The local disks are freed up when Amazon MSK finishes moving that log segment to remote tier and after it meets the local retention policy.

How read works in a tiered storage-enabled topic

For any read request, ReplicaManager tries to process the request by sending it to ReadFromLocalLog. And if the process returns offset out of range exception, it delegates the read call to RemoteLogManager to read from tiered storage. On the read path, the RemoteStorageManager starts fetching the data in chunks from remote storage, which means that for the first few bytes, your consumer experiences higher latency, but as the system starts buffering the segment locally, your consumer experiences latency similar to reading from local storage. One of the advantages of this approach is that the data is served instantly from the local buffer if there are multiple consumers reading from the same segment.

If your consumer is configured to read from the closest replica, there might be a possibility that the consumer from a different consumer group reads the same remote segment using a different broker. In that case, they experience the same latency behavior we described previously.


In this post, we discussed the core components of the Amazon MSK tiered storage feature and explained how the data lifecycle works in a cluster enabled with tiered storage. Stay tuned for our upcoming post, in which we delve into the best practices for sizing and running a tiered storage-enabled cluster in production.

We would love to hear how you’re building your real-time data streaming applications today. If you’re just getting started with Amazon MSK tiered storage, we recommend getting hands-on with the guidelines available in the tiered storage documentation.

If you have any questions or feedback, please leave them in the comments section.

About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Migrate from Google BigQuery to Amazon Redshift using AWS Glue and Custom Auto Loader Framework

Post Syndicated from Tahir Aziz original https://aws.amazon.com/blogs/big-data/migrate-from-google-bigquery-to-amazon-redshift-using-aws-glue-and-custom-auto-loader-framework/

Amazon Redshift is a widely used, fully managed, petabyte-scale cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytic workloads. Customers are looking for tools that make it easier to migrate from other data warehouses, such as Google BigQuery, to Amazon Redshift to take advantage of the service price-performance, ease of use, security, and reliability.

In this post, we show you how to use AWS native services to accelerate your migration from Google BigQuery to Amazon Redshift. We use AWS Glue, a fully managed, serverless, ETL (extract, transform, and load) service, and the Google BigQuery Connector for AWS Glue (for more information, refer to Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors). We also add automation and flexibility to simplify migration of multiple tables to Amazon Redshift using the Custom Auto Loader Framework.

Solution overview

The solution provides a scalable and managed data migration workflow to migrate data from Google BigQuery to Amazon Simple Storage Service (Amazon S3), and then from Amazon S3 to Amazon Redshift. This pre-built solution scales to load data in parallel using input parameters.

The following architecture diagram shows how the solution works. It starts with setting up the migration configuration to connect to Google BigQuery, then converts the database schemas, and finally migrates the data to Amazon Redshift.

Architecture diagram showing how the solution works. It starts with setting-up the migration configuration to connect to Google BigQuery, then convert the database schemas, and finally migrate the data to Amazon Redshift.

The workflow contains the following steps:

  1. A configuration file is uploaded to an S3 bucket you have chosen for this solution. This JSON file contains the migration metadata, namely the following:
    • A list of Google BigQuery projects and datasets.
    • A list of all tables to be migrated for each project and dataset pair.
  2. An Amazon EventBridge rule triggers an AWS Step Functions state machine to start migrating the tables.
  3. The Step Functions state machine iterates on the tables to be migrated and runs an AWS Glue Python shell job to extract the metadata from Google BigQuery and store it in an Amazon DynamoDB table used for tracking the tables’ migration status.
  4. The state machine iterates on the metadata from this DynamoDB table to run the table migration in parallel, based on the maximum number of migration jobs without incurring limits or quotas on Google BigQuery. It performs the following steps:
    • Runs the AWS Glue migration job for each table in parallel.
    • Tracks the run status in the DynamoDB table.
    • After the tables have been migrated, checks for errors and exits.
  5. The data exported from Google BigQuery is saved to Amazon S3. We use Amazon S3 (even though AWS Glue jobs can write directly to Amazon Redshift tables) for a few specific reasons:
    • We can decouple the data migration and the data load steps.
    • It offers more control on the load steps, with the ability to reload the data or pause the process.
    • It provides fine-grained monitoring of the Amazon Redshift load status.
  6. The Custom Auto Loader Framework automatically creates schemas and tables in the target database and continuously loads data from Amazon S3 to Amazon Redshift.

A few additional points to note:

  • If you have already created the target schema and tables in the Amazon Redshift database, you can configure the Custom Auto Loader Framework to not automatically detect and convert the schema.
  • If you want more control over converting the Google BigQuery schema, you can use the AWS Schema Conversion Tool (AWS SCT). For more information, refer to Migrate Google BigQuery to Amazon Redshift using AWS Schema Conversion tool (SCT).
  • As of this writing, neither the AWS SCT nor Custom Auto Loader Framework support the conversion of nested data types (record, array and struct). Amazon Redshift supports semistructured data using the Super data type, so if your table uses such complex data types, then you need to create the target tables manually.

To deploy the solution, there are two main steps:

  1. Deploy the solution stack using AWS CloudFormation.
  2. Deploy and configure Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift.


Before getting started, make sure you have the following:

In this example, we named the file bq-mig-config.json

    1. Configure your Google account.
    2. Create an IAM role for AWS Glue (and note down the name of the IAM role).
    3. Subscribe to and activate the Google BigQuery Connector for AWS Glue.

Deploy the solution using AWS CloudFormation

To deploy the solution stack using AWS CloudFormation, complete the following steps:

  1. Choose Launch Stack:

This template provisions the AWS resources in the us-east-1 Region. If you want to deploy to a different Region, download the template bigquery-cft.yaml and launch it manually: on the AWS CloudFormation console, choose Create stack with new resources and upload the template file you downloaded.

The list of provisioned resources is as follows:

    • An EventBridge rule to start the Step Functions state machine on the upload of the configuration file.
    • A Step Functions state machine that runs the migration logic. The following diagram illustrates the state machine.
      Diagram representing the state machine deployed by the solution stack.
    • An AWS Glue Python shell job used to extract the metadata from Google BigQuery. The metadata will be stored in an DynamoDB table, with a calculated attribute to prioritize the migration job. By default, the connector creates one partition per 400 MB in the table being read (before filtering). As of this writing, the Google BigQuery Storage API has a maximum quota for parallel read streams, so we set the limit for worker nodes for tables larger than 400 GB. We also calculate the max number of jobs that can run in parallel based on those values.
    • An AWS Glue ETL job used to extract the data from each Google BigQuery table and saves it in Amazon S3 in Parquet format.
    • A DynamoDB table (bq_to_s3_tracking) used to store the metadata for each table to be migrated (size of the table, S3 path used to store the migrated data, and the number of workers needed to migrate the table).
    • A DynamoDB table (bq_to_s3_maxstreams) used to store the maximum number of streams per state machine run. This helps us minimize job failures due to limits or quotas. Use the Cloud Formation template to customize the name of the DynamoDB table. The prefix for the DynamoDB table is bq_to_s3.
    • The IAM roles needed by the state machine and AWS Glue jobs.
  1. Choose Next.

Screen caption showing the AWS Cloudformation Create stack page.

  1. For Stack name, enter a name.
  2. For Parameters, enter the parameters listed in the following table, then choose Create.
CloudFormation Template Parameter Allowed Values Description
InputBucketName S3 bucket name

The S3 bucket where the AWS Glue job stores the migrated data.

The data will be actually stored in a folder named s3-redshift-loader-source, which is used by the Custom Auto Loader Framework.

InputConnectionName AWS Glue connection name, the default is glue-bq-connector-24 The name of the AWS Glue connection that is created using the Google BigQuery connector.
InputDynamoDBTablePrefix DynamoDB table name prefix, the default is bq_to_s3 The prefix that will be used when naming the two DynamoDB tables created by the solution.
InputGlueETLJob AWS Glue ETL job name, the default is bq-migration-ETL The name you want to give to the AWS Glue ETL job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueMetaJob AWS Glue Python shell job name, the default is bq-get-metadata The name you want to give to AWS Glue Python shell job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueS3Path S3 path, the default is s3://aws-glue-scripts-${AWS::Account}-${AWS::Region}/admin/ This is the S3 path in which the stack will copy the scripts for AWS Glue jobs. Remember to replace: ${AWS::Account} with the actual AWS account ID and ${AWS::Region} with the Region you plan to use, or provide your own bucket and prefix in a complete path.
InputMaxParallelism Number of parallel migration jobs to run, the default is 30 The maximum number of tables you want to migrate concurrently.
InputBQSecret AWS Secrets Manager secret name The name of the AWS Secrets Manager secret in which you stored the Google BigQuery credential.
InputBQProjectName Google BigQuery project name The name of your project in Google BigQuery in which you want to store temporary tables; you will need write permissions on the project.

Step Functions state machine name, the default is


The name of the Step Functions state machine.
SourceS3BucketName S3 bucket name, the default is aws-blogs-artifacts-public

The S3 bucket where the artifacts for this post are stored.

Do not change the default.

Deploy and configure the Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift

The Custom Auto Loader Framework utility makes data ingestion to Amazon Redshift simpler and automatically loads data files from Amazon S3 to Amazon Redshift. The files are mapped to the respective tables by simply dropping files into preconfigured locations on Amazon S3. For more details about the architecture and internal workflow, refer to Custom Auto Loader Framework.

To set up the Custom Auto Loader Framework, complete the following steps:

  1. Choose Launch Stack to deploy the CloudFormation stack in the us-east-1 Region:

  1. On the AWS CloudFormation console, choose Next.
  2. Provide the following parameters to help ensure the successful creation of resources. Make sure you have collected these values beforehand.
Parameter Name Allowed Values Description
CopyCommandSchedule cron(0/5 * ? * * *) The EventBridge rules KickoffFileProcessingSchedule and QueueRSProcessingSchedule are triggered based on this schedule. The default is 5 minutes.
DatabaseName dev The Amazon Redshift database name.
DatabaseSchemaName public The Amazon Redshift schema name.
DatabaseUserName demo The Amazon Redshift user name who has access to run COPY commands on the Amazon Redshift database and schema.
RedshiftClusterIdentifier democluster The Amazon Redshift cluster name.
RedshiftIAMRoleARN arn:aws:iam::7000000000:role/RedshiftDemoRole The Amazon Redshift cluster attached role, which has access to the S3 bucket. This role is used in COPY commands.
SourceS3Bucket Your-bucket-name The S3 bucket where data is located. Use the same bucket you used to store the migrated data as indicated in the previous stack.
CopyCommandOptions delimiter '|' gzip

Provide the additional COPY command data format parameters as follows:

delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

InitiateSchemaDetection Yes The setting to dynamically detect the schema prior to file upload.

The following screenshot shows an example of our parameters.

Screen capture showing the stack detailes page with the input parameters filled with example values

  1. Choose Create.
  2. Monitor the progress of the Stack creation and wait until it is complete.
  3. To verify the Custom Auto Loader Framework configuration, log in to the Amazon S3 console and navigate to the S3 bucket you provided as a value to the SourceS3Bucket parameter.

You should see a new directory called s3-redshift-loader-source is created.

Screen caption of the Amazon S3 console showing the folder you should be able to see in your S3 bucket.

Test the solution

To test the solution, complete the following steps:

  1. Create the configuration file based on the prerequisites. You can also download the demo file.
  2. To set up the S3 bucket, on the Amazon S3 console, navigate to the folder bq-mig-config in the bucket you provided in the stack.
  3. Upload the config file into it.
  4. To enable EventBridge notifications to the bucket, open the bucket on the console and on the Properties tab, locate Event notifications.
  5. In the Amazon EventBridge section, choose Edit.
  6. Select On, then choose Save changes.

  1. On AWS Step Function console, monitor the run of the state machine.
  2. Monitor the status of the loads in Amazon Redshift. For instructions, refer to Viewing Current Loads.
  3. Open the Amazon Redshift Query Editor V2 and query your data.

Pricing considerations

You might have egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving your data on your Google cloud billing console. As of this writing, AWS Glue 3.0 or later charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. For more information, see AWS Glue Pricing. With auto scaling enabled, AWS Glue automatically adds and removes workers from the cluster depending on the parallelism at each stage or microbatch of the job run.

Clean up

To avoid incurring future charges, clean up your resources:

  1. Delete the CloudFormation solution stack.
  2. Delete the CloudFormation Custom Auto Loader Framework stack.


In this post, we demonstrated how to build a scalable and automated data pipeline to migrate your data from Google BigQuery to Amazon Redshift. We also highlighted how the Custom Auto Loader framework can automate the schema detection, create tables for your S3 files, and continuously load the files into your Amazon Redshift warehouse. With this approach, you can automate the migration of entire projects (even multiple projects at the time) in Google BigQuery to Amazon Redshift. This helps improve data migration times into Amazon Redshift significantly through the automatic table migration parallelization.

The auto-copy feature in Amazon Redshift simplifies automatic data loading from Amazon S3 with a simple SQL command, users can easily automate data ingestion from Amazon S3 to Amazon Redshift using the Amazon Redshift auto-copy preview feature

For more information about the performance of the Google BigQuery Connector for AWS Glue, refer to Migrate terabytes of data quickly from Google Cloud to Amazon S3 with AWS Glue Connector for Google BigQuery and learn how to migrate a large amount of data (1.9 TB) into Amazon S3 quickly (about 8 minutes).

To learn more about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.

About the Authors

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling and cooking.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Fabrizio Napolitano is a Principal Specialist Solutions Architect for DB and Analytics. He has worked in the analytics space for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Manjula Nagineni is a Senior Solutions Architect with AWS based in New York. She works with major financial service institutions, architecting and modernizing their large-scale applications while adopting AWS Cloud services. She is passionate about designing big data workloads cloud-natively. She has over 20 years of IT experience in software development, analytics, and architecture across multiple domains such as finance, retail, and telecom.

Sohaib Katariwala is an Analytics Specialist Solutions Architect at AWS. He has over 12 years of experience helping organizations derive insights from their data.

Build event-driven data pipelines using AWS Controllers for Kubernetes and Amazon EMR on EKS

Post Syndicated from Victor Gu original https://aws.amazon.com/blogs/big-data/build-event-driven-data-pipelines-using-aws-controllers-for-kubernetes-and-amazon-emr-on-eks/

An event-driven architecture is a software design pattern in which decoupled applications can asynchronously publish and subscribe to events via an event broker. By promoting loose coupling between components of a system, an event-driven architecture leads to greater agility and can enable components in the system to scale independently and fail without impacting other services. AWS has many services to build solutions with an event-driven architecture, such as Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), and AWS Lambda.

Amazon Elastic Kubernetes Service (Amazon EKS) is becoming a popular choice among AWS customers to host long-running analytics and AI or machine learning (ML) workloads. By containerizing your data processing tasks, you can simply deploy them into Amazon EKS as Kubernetes jobs and use Kubernetes to manage underlying computing compute resources. For big data processing, which requires distributed computing, you can use Spark on Amazon EKS. Amazon EMR on EKS, a managed Spark framework on Amazon EKS, enables you to run Spark jobs with benefits of scalability, portability, extensibility, and speed. With EMR on EKS, the Spark jobs run using the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less than open-source Apache Spark.

Data processes require a workflow management to schedule jobs and manage dependencies between jobs, and require monitoring to ensure that the transformed data is always accurate and up to date. One popular orchestration tool for managing workflows is Apache Airflow, which can be installed in Amazon EKS. Alternatively, you can use the AWS-managed version, Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Another option is to use AWS Step Functions, which is a serverless workflow service that integrates with EMR on EKS and EventBridge to build event-driven workflows.

In this post, we demonstrate how to build an event-driven data pipeline using AWS Controllers for Kubernetes (ACK) and EMR on EKS. We use ACK to provision and configure serverless AWS resources, such as EventBridge and Step Functions. Triggered by an EventBridge rule, Step Functions orchestrates jobs running in EMR on EKS. With ACK, you can use the Kubernetes API and configuration language to create and configure AWS resources the same way you create and configure a Kubernetes data processing job. Because most of the managed services are serverless, you can build and manage your entire data pipeline using the Kubernetes API with tools such as kubectl.

Solution overview

ACK lets you define and use AWS service resources directly from Kubernetes, using the Kubernetes Resource Model (KRM). The ACK project contains a series of service controllers, one for each AWS service API. With ACK, developers can stay in their familiar Kubernetes environment and take advantage of AWS services for their application-supporting infrastructure. In the post Microservices development using AWS controllers for Kubernetes (ACK) and Amazon EKS blueprints, we show how to use ACK for microservices development.

In this post, we show how to build an event-driven data pipeline using ACK controllers for EMR on EKS, Step Functions, EventBridge, and Amazon Simple Storage Service (Amazon S3). We provision an EKS cluster with ACK controllers using Terraform modules. We create the data pipeline with the following steps:

  1. Create the emr-data-team-a namespace and bind it with the virtual cluster my-ack-vc in Amazon EMR by using the ACK controller.
  2. Use the ACK controller for Amazon S3 to create an S3 bucket. Upload the sample Spark scripts and sample data to the S3 bucket.
  3. Use the ACK controller for Step Functions to create a Step Functions state machine as an EventBridge rule target based on Kubernetes resources defined in YAML manifests.
  4. Use the ACK controller for EventBridge to create an EventBridge rule for pattern matching and target routing.

The pipeline is triggered when a new script is uploaded. An S3 upload notification is sent to EventBridge and, if it matches the specified rule pattern, triggers the Step Functions state machine. Step Functions calls the EMR virtual cluster to run the Spark job, and all the Spark executors and driver are provisioned inside the emr-data-team-a namespace. The output is saved back to the S3 bucket, and the developer can check the result on the Amazon EMR console.

The following diagram illustrates this architecture.


Ensure that you have the following tools installed locally:

Deploy the solution infrastructure

Because each ACK service controller requires different AWS Identity and Access Management (IAM) roles for managing AWS resources, it’s better to use an automation tool to install the required service controllers. For this post, we use Amazon EKS Blueprints for Terraform and the AWS EKS ACK Addons Terraform module to provision the following components:

  • A new VPC with three private subnets and three public subnets
  • An internet gateway for the public subnets and a NAT Gateway for the private subnets
  • An EKS cluster control plane with one managed node group
  • Amazon EKS-managed add-ons: VPC_CNI, CoreDNS, and Kube_Proxy
  • ACK controllers for EMR on EKS, Step Functions, EventBridge, and Amazon S3
  • IAM execution roles for EMR on EKS, Step Functions, and EventBridge

Let’s start by cloning the GitHub repo to your local desktop. The module eks_ack_addons in addon.tf is for installing ACK controllers. ACK controllers are installed by using helm charts in the Amazon ECR public galley. See the following code:

cd examples/usecases/event-driven-pipeline
terraform init
terraform plan
terraform apply -auto-approve #defaults to us-west-2

The following screenshot shows an example of our output. emr_on_eks_role_arn is the ARN of the IAM role created for Amazon EMR running Spark jobs in the emr-data-team-a namespace in Amazon EKS. stepfunction_role_arn is the ARN of the IAM execution role for the Step Functions state machine. eventbridge_role_arn is the ARN of the IAM execution role for the EventBridge rule.

The following command updates kubeconfig on your local machine and allows you to interact with your EKS cluster using kubectl to validate the deployment:

aws eks --region $region update-kubeconfig --name event-driven-pipeline-demo

Test your access to the EKS cluster by listing the nodes:

kubectl get nodes
# Output should look like below
NAME                                        STATUS   ROLES    AGE     VERSION
ip-10-1-10-64.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-65.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-7.us-west-2.compute.internal     Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-73.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-11-96.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-12-197.us-west-2.compute.internal   Ready    <none>   19h     v1.24.9-eks-49d8fe8

Now we’re ready to set up the event-driven pipeline.

Create an EMR virtual cluster

Let’s start by creating a virtual cluster in Amazon EMR and link it with a Kubernetes namespace in EKS. By doing that, the virtual cluster will use the linked namespace in Amazon EKS for running Spark workloads. We use the file emr-virtualcluster.yaml. See the following code:

apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
  name: my-ack-vc
  name: my-ack-vc
    id: event-driven-pipeline-demo  # your eks cluster name
    type_: EKS
        namespace: emr-data-team-a # namespace binding with EMR virtual cluster

Let’s apply the manifest by using the following kubectl command:

kubectl apply -f ack-yamls/emr-virtualcluster.yaml

You can navigate to the Virtual clusters page on the Amazon EMR console to see the cluster record.

Create an S3 bucket and upload data

Next, let’s create a S3 bucket for storing Spark pod templates and sample data. We use the s3.yaml file. See the following code:

apiVersion: s3.services.k8s.aws/v1alpha1
kind: Bucket
  name: sparkjob-demo-bucket
  name: sparkjob-demo-bucket

kubectl apply -f ack-yamls/s3.yaml

If you don’t see the bucket, you can check the log from the ACK S3 controller pod for details. The error is mostly caused if a bucket with the same name already exists. You need to change the bucket name in s3.yaml as well as in eventbridge.yaml and sfn.yaml. You also need to update upload-inputdata.sh and upload-spark-scripts.sh with the new bucket name.

Run the following command to upload the input data and pod templates:

bash spark-scripts-data/upload-inputdata.sh

The sparkjob-demo-bucket S3 bucket is created with two folders: input and scripts.

Create a Step Functions state machine

The next step is to create a Step Functions state machine that calls the EMR virtual cluster to run a Spark job, which is a sample Python script to process the New York City Taxi Records dataset. You need to define the Spark script location and pod templates for the Spark driver and executor in the StateMachine object .yaml file. Let’s make the following changes (highlighted) in sfn.yaml first:

  • Replace the value for roleARN with stepfunctions_role_arn
  • Replace the value for ExecutionRoleArn with emr_on_eks_role_arn
  • Replace the value for VirtualClusterId with your virtual cluster ID
  • Optionally, replace sparkjob-demo-bucket with your bucket name

See the following code:

apiVersion: sfn.services.k8s.aws/v1alpha1
kind: StateMachine
  name: run-spark-job-ack
  name: run-spark-job-ack
  roleARN: "arn:aws:iam::xxxxxxxxxxx:role/event-driven-pipeline-demo-sfn-execution-role"   # replace with your stepfunctions_role_arn
  - key: owner
    value: sfn-ack
  definition: |
      "Comment": "A description of my state machine",
      "StartAt": "input-output-s3",
      "States": {
        "input-output-s3": {
          "Type": "Task",
          "Resource": "arn:aws:states:::emr-containers:startJobRun.sync",
          "Parameters": {
            "VirtualClusterId": "f0u3vt3y4q2r1ot11m7v809y6",  
            "ExecutionRoleArn": "arn:aws:iam::xxxxxxxxxxx:role/event-driven-pipeline-demo-emr-eks-data-team-a",
            "ReleaseLabel": "emr-6.7.0-latest",
            "JobDriver": {
              "SparkSubmitJobDriver": {
                "EntryPoint": "s3://sparkjob-demo-bucket/scripts/pyspark-taxi-trip.py",
                "EntryPointArguments": [
                "SparkSubmitParameters": "--conf spark.executor.instances=10"
            "ConfigurationOverrides": {
              "ApplicationConfiguration": [
                 "Classification": "spark-defaults",
                "Properties": {
                  "spark.driver.memory": "10g",
                  "spark.executor.memory": "10g",
                  "spark.local.dir" : "/data1,/data2"

You can get your virtual cluster ID from the Amazon EMR console or with the following command:

kubectl get virtualcluster -o jsonpath={.items..status.id}
# result:
f0u3vt3y4q2r1ot11m7v809y6  # VirtualClusterId

Then apply the manifest to create the Step Functions state machine:

kubectl apply -f ack-yamls/sfn.yaml

Create an EventBridge rule

The last step is to create an EventBridge rule, which is used as an event broker to receive event notifications from Amazon S3. Whenever a new file, such as a new Spark script, is created in the S3 bucket, the EventBridge rule will evaluate (filter) the event and invoke the Step Functions state machine if it matches the specified rule pattern, triggering the configured Spark job.

Let’s use the following command to get the ARN of the Step Functions state machine we created earlier:

kubectl get StateMachine -o jsonpath={.items..status.ackResourceMetadata.arn}
# result
arn: arn:aws:states:us-west-2:xxxxxxxxxx:stateMachine:run-spark-job-ack # sfn_arn

Then, update eventbridge.yaml with the following values:

  • Under targets, replace the value for roleARN with eventbridge_role_arn

Under targets, replace arn with your sfn_arn

  • Optionally, in eventPattern, replace sparkjob-demo-bucket with your bucket name

See the following code:

apiVersion: eventbridge.services.k8s.aws/v1alpha1
kind: Rule
  name: eb-rule-ack
  name: eb-rule-ack
  description: "ACK EventBridge Filter Rule to sfn using event bus reference"
  eventPattern: | 
      "source": ["aws.s3"],
      "detail-type": ["Object Created"],
      "detail": {
        "bucket": {
          "name": ["sparkjob-demo-bucket"]    
        "object": {
          "key": [{
            "prefix": "scripts/"
    - arn: arn:aws:states:us-west-2:xxxxxxxxxx:stateMachine:run-spark-job-ack # replace with your sfn arn
      id: sfn-run-spark-job-target
      roleARN: arn:aws:iam::xxxxxxxxx:role/event-driven-pipeline-demo-eb-execution-role # replace your eventbridge_role_arn
        maximumRetryAttempts: 0 # no retries
    - key:owner
      value: eb-ack

By applying the EventBridge configuration file, an EventBridge rule is created to monitor the folder scripts in the S3 bucket sparkjob-demo-bucket:

kubectl apply -f ack-yamls/eventbridge.yaml

For simplicity, the dead-letter queue is not set and maximum retry attempts is set to 0. For production usage, set them based on your requirements. For more information, refer to Event retry policy and using dead-letter queues.

Test the data pipeline

To test the data pipeline, we trigger it by uploading a Spark script to the S3 bucket scripts folder using the following command:

bash spark-scripts-data/upload-spark-scripts.sh

The upload event triggers the EventBridge rule and then calls the Step Functions state machine. You can go to the State machines page on the Step Functions console and choose the job run-spark-job-ack to monitor its status.

For the Spark job details, on the Amazon EMR console, choose Virtual clusters in the navigation pane, and then choose my-ack-vc. You can review all the job run history for this virtual cluster. If you choose Spark UI in any row, you’re redirected the Spark history server for more Spark driver and executor logs.

Clean up

To clean up the resources created in the post, use the following code:

aws s3 rm s3://sparkjob-demo-bucket --recursive # clean up data in S3
kubectl delete -f ack-yamls/. #Delete aws resources created by ACK
terraform destroy -target="module.eks_blueprints_kubernetes_addons" -target="module.eks_ack_addons" -auto-approve -var region=$region
terraform destroy -target="module.eks_blueprints" -auto-approve -var region=$region
terraform destroy -auto-approve -var region=$regionterraform destroy -auto-approve -var region=$region


This post showed how to build an event-driven data pipeline purely with native Kubernetes API and tooling. The pipeline uses EMR on EKS as compute and uses serverless AWS resources Amazon S3, EventBridge, and Step Functions as storage and orchestration in an event-driven architecture. With EventBridge, AWS and custom events can be ingested, filtered, transformed, and reliably delivered (routed) to more than 20 AWS services and public APIs (webhooks), using human-readable configuration instead of writing undifferentiated code. EventBridge helps you decouple applications and achieve more efficient organizations using event-driven architectures, and has quickly become the event bus of choice for AWS customers for many use cases, such as auditing and monitoring, application integration, and IT automation.

By using ACK controllers to create and configure different AWS services, developers can perform all data plane operations without leaving the Kubernetes platform. Also, developers only need to maintain the EKS cluster because all the other components are serverless.

As a next step, clone the GitHub repository to your local machine and test the data pipeline in your own AWS account. You can modify the code in this post and customize it for your own needs by using different EventBridge rules or adding more steps in Step Functions.

About the authors

Victor Gu is a Containers and Serverless Architect at AWS. He works with AWS customers to design microservices and cloud native solutions using Amazon EKS/ECS and AWS serverless services. His specialties are Kubernetes, Spark on Kubernetes, MLOps and DevOps.

Michael Gasch is a Senior Product Manager for AWS EventBridge, driving innovations in event-driven architectures. Prior to AWS, Michael was a Staff Engineer at the VMware Office of the CTO, working on open-source projects, such as Kubernetes and Knative, and related distributed systems research.

Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

Post Syndicated from Nith Govindasivan original https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.


Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.


In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.

About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
            “name”: “movie_id”, 
            “type”: “INTEGER” 
            “name”: “title”, 
            “type”: “STRING” 
            “name”: “release_year”,
            “type”: “INTEGER” 


Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
      "name": "movie_id",
      "type": "int"
      "name": "title",
      "type": "string"
      "name": "release_year",
      "type": "int"
      "name": "__op",
      "type": [
      "default": null
      "name": "__source_ts_ms",
      "type": [
      "default": null
      "name": "__deleted",
      "type": [
      "default": null
  "connect.name": "db1.sampledatabase.movies.Value"

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
      "name": "movie_id",
      "type": "int"
      "name": "title",
      "type": "string"
      "name": "release_year",
      "type": "int"
      "name": "COUNTRY",
      "type": "string"
      "name": "__op",
      "type": [
      "default": null
      "name": "__source_ts_ms",
      "type": [
      "default": null
      "name": "__deleted",
      "type": [
      "default": null
  "connect.name": "db1.sampledatabase.movies.Value"
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.


This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.

About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Three ways to boost your email security and brand reputation with AWS

Post Syndicated from Michael Davie original https://aws.amazon.com/blogs/security/three-ways-to-boost-your-email-security-and-brand-reputation-with-aws/

If you own a domain that you use for email, you want to maintain the reputation and goodwill of your domain’s brand. Several industry-standard mechanisms can help prevent your domain from being used as part of a phishing attack. In this post, we’ll show you how to deploy three of these mechanisms, which visually authenticate emails sent from your domain to users and verify that emails are encrypted in transit. It can take as little as 15 minutes to deploy these mechanisms on Amazon Web Services (AWS), and the result can help to provide immediate and long-term improvements to your organization’s email security.

Phishing through email remains one of the most common ways that bad actors try to compromise computer systems. Incidents of phishing and related crimes far outnumber the incidents of other categories of internet crime, according to the most recent FBI Internet Crime Report. Phishing has consistently led to large annual financial losses in the US and globally.

Overview of BIMI, MTA-STS, and TLS reporting

An earlier post has covered how you can use Amazon Simple Email Service (Amazon SES) to send emails that align with best practices, including the IETF internet standards: Sender Policy Framework (SPF), DomainKeys Identified Mail (DKIM), and Domain-based Message Authentication, Reporting, and Conformance (DMARC). This post will show you how to build on this foundation and configure your domains to align with additional email security standards, including the following:

  • Brand Indicators for Message Identification (BIMI) – This standard allows you to associate a logo with your email domain, which some email clients will display to users in their inbox. Visit the BIMI Group’s Where is my BIMI Logo Displayed? webpage to see how logos are displayed in the user interfaces of BIMI-supporting mailbox providers; Figure 1 shows a mock-up of a typical layout that contains a logo.
  • Mail Transfer Agent Strict Transport Security (MTA-STS) – This standard helps ensure that email servers always use TLS encryption and certificate-based authentication when they send messages to your domain, to protect the confidentiality and integrity of email in transit.
  • SMTP TLS reporting – This reporting allows you to receive reports and monitor your domain’s TLS security posture, identify problems, and learn about attacks that might be occurring.
Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

These three standards require your Domain Name System (DNS) to publish specific records, for example by using Amazon Route 53, that point to web pages that have additional information. You can host this information without having to maintain a web server by storing it in Amazon Simple Storage Service (Amazon S3) and delivering it through Amazon CloudFront, secured with a certificate provisioned from AWS Certificate Manager (ACM).

Note: This AWS solution works for DKIM, BIMI, and DMARC, regardless of what you use to serve the actual email for your domains, which services you use to send email, and where you host DNS. For purposes of clarity, this post assumes that you are using Route 53 for DNS. If you use a different DNS hosting provider, you will manually configure DNS records in your existing hosting provider.

Solution architecture

The architecture for this solution is depicted in Figure 2.

Figure 2: The architecture diagram showing how the solution components interact

Figure 2: The architecture diagram showing how the solution components interact

The interaction points are as follows:

  1. The web content is stored in an S3 bucket, and CloudFront has access to this bucket through an origin access identity, a mechanism of AWS Identity and Access Management (IAM).
  2. As described in more detail in the BIMI section of this blog post, the Verified Mark Certificate is obtained from a BIMI-qualified certificate authority and stored in the S3 bucket.
  3. When an external email system receives a message claiming to be from your domain, it looks up BIMI records for your domain in DNS. As depicted in the diagram, a DNS request is sent to Route 53.
  4. To retrieve the BIMI logo image and Verified Mark Certificate, the external email system will make HTTPS requests to a URL published in the BIMI DNS record. In this solution, the URL points to the CloudFront distribution, which has a TLS certificate provisioned with ACM.

A few important warnings

Email is a complex system of interoperating technologies. It is also brittle: a typo or a missing DNS record can make the difference between whether an email is delivered or not. Pay close attention to your email server and the users of your email systems when implementing the solution in this blog post. The main indicator that something is wrong is the absence of email. Instead of seeing an error in your email server’s log, users will tell you that they’re expecting to receive an email from somewhere and it’s not arriving. Or they will tell you that they sent an email, and their recipient can’t find it.

The DNS uses a lot of caching and time-out values to improve its efficiency. That makes DNS records slow and a little unpredictable as they propagate across the internet. So keep in mind that as you monitor your systems, it can be hours or even more than a day before the DNS record changes have an effect that you can detect.

This solution uses AWS Cloud Development Kit (CDK) custom resources, which are supported by AWS Lambda functions that will be created as part of the deployment. These functions are configured to use CDK-selected runtimes, which will eventually pass out of support and require you to update them.


You will need permission in an AWS account to create and configure the following resources:

  • An Amazon S3 bucket to store the files and access logs
  • A CloudFront distribution to publicly deliver the files from the S3 bucket
  • A TLS certificate in ACM
  • An origin access identity in IAM that CloudFront will use to access files in Amazon S3
  • Lambda functions, IAM roles, and IAM policies created by CDK custom resources

You might also want to enable these optional services:

  • Amazon Route 53 for setting the necessary DNS records. If your domain is hosted by another DNS provider, you will set these DNS records manually.
  • Amazon SES or an Amazon WorkMail organization with a single mailbox. You can configure either service with a subdomain (for example, [email protected]) such that the existing domain is not disrupted, or you can create new email addresses by using your existing email mailbox provider.

BIMI has some additional requirements:

  • BIMI requires an email domain to have implemented a strong DMARC policy so that recipients can be confident in the authenticity of the branded logos. Your email domain must have a DMARC policy of p=quarantine or p=reject. Additionally, the domain’s policy cannot have sp=none or pct<100.

    Note: Do not adjust the DMARC policy of your domain without careful testing, because this can disrupt mail delivery.

  • You must have your brand’s logo in Scaled Vector Graphics (SVG) format that conforms to the BIMI standard. For more information, see Creating BIMI SVG Logo Files on the BIMI Group website.
  • Purchase a Verified Mark Certificate (VMC) issued by a third-party certificate authority. This certificate attests that the logo, organization, and domain are associated with each other, based on a legal trademark registration. Many email hosting providers require this additional certificate before they will show your branded logo to their users. Others do not currently support BIMI, and others might have alternative mechanisms to determine whether to show your logo. For more information about purchasing a Verified Mark Certificate, see the BIMI Group website.

    Note: If you are not ready to purchase a VMC, you can deploy this solution and validate that BIMI is correctly configured for your domain, but your branded logo will not display to recipients at major email providers.

What gets deployed in this solution?

This solution deploys the DNS records and supporting files that are required to implement BIMI, MTA-STS, and SMTP TLS reporting for an email domain. We’ll look at the deployment in more detail in the following sections.


BIMI is described by the Internet Engineering Task Force (IETF) as follows:

Brand Indicators for Message Identification (BIMI) permits Domain Owners to coordinate with Mail User Agents (MUAs) to display brand-specific Indicators next to properly authenticated messages. There are two aspects of BIMI coordination: a scalable mechanism for Domain Owners to publish their desired Indicators, and a mechanism for Mail Transfer Agents (MTAs) to verify the authenticity of the Indicator. This document specifies how Domain Owners communicate their desired Indicators through the BIMI Assertion Record in DNS and how that record is to be interpreted by MTAs and MUAs. MUAs and mail-receiving organizations are free to define their own policies for making use of BIMI data and for Indicator display as they see fit.

If your organization has a trademark-protected logo, you can set up BIMI to have that logo displayed to recipients in their email inboxes. This can have a positive impact on your brand and indicates to end users that your email is more trustworthy. The BIMI Group shows examples of how brand logos are displayed in user inboxes, as well as a list of known email service providers that support the display of BIMI logos.

As a domain owner, you can implement BIMI by publishing the relevant DNS records and hosting the relevant files. To have your logo displayed by most email hosting providers, you will need to purchase a Verified Mark Certificate from a BIMI-qualified certificate authority.

This solution will deploy a valid BIMI record in Route 53 (or tell you what to publish in the DNS if you’re not using Route 53) and will store your provided SVG logo and Verified Mark Certificate files in Amazon S3, to be delivered through CloudFront with a valid TLS certificate from ACM.

To support BIMI, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host:
    default._bimi.<your-domain>. The value of this record is: v=BIMI1; l=<url-of-your-logo> a=<url-of-verified-mark-certificate>. The value of <your-domain> refers to the domain that is used in the From header of messages that your organization sends.
  2. The logo and optional Verified Mark Certificate are hosted publicly at the HTTPS locations defined by <url-of-your-logo> and <url-of-verified-mark-certificate>, respectively.


MTA-STS is described by the IETF in RFC 8461 as follows:

SMTP (Simple Mail Transport Protocol) MTA Strict Transport Security (MTA-STS) is a mechanism enabling mail service providers to declare their ability to receive Transport Layer Security (TLS) secure SMTP connections and to specify whether sending SMTP servers should refuse to deliver to MX hosts that do not offer TLS with a trusted server certificate.

Put simply, MTA-STS helps ensure that email servers always use encryption and certificate-based authentication when sending email to your domains, so that message integrity and confidentiality are preserved while in transit across the internet. MTA-STS also helps to ensure that messages are only sent to authorized servers.

This solution will deploy a valid MTA-STS policy record in Route 53 (or tell you what value to publish in the DNS if you’re not using Route 53) and will create an MTA-STS policy document to be hosted on S3 and delivered through CloudFront with a valid TLS certificate from ACM.

To support MTA-STS, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _mta-sts.<your-domain>. The value of this record is: v=STSv1; id=<unique value used for cache invalidation>.
  2. The MTA-STS policy document is hosted at and obtained from the following location: https://mta-sts.<your-domain>/.well-known/mta-sts.txt.
  3. The value of <your-domain> in both cases is the domain that is used for routing inbound mail to your organization and is typically the same domain that is used in the From header of messages that your organization sends externally. Depending on the complexity of your organization, you might receive inbound mail for multiple domains, and you might choose to publish MTA-STS policies for each domain.

Is it ever bad to encrypt everything?

In the example MTA-STS policy file provided in the GitHub repository and explained later in this post, the MTA-STS policy mode is set to testing. This means that your email server is advertising its willingness to negotiate encrypted email connections, but it does not require TLS. Servers that want to send mail to you are allowed to connect and deliver mail even if there are problems in the TLS connection, as long as you’re in testing mode. You should expect reports when servers try to connect through TLS to your mail server and fail to do so.

Be fully prepared before you change the MTA-STS policy to enforce. After this policy is set to enforce, servers that follow the MTA-STS policy and that experience an enforceable TLS-related error when they try to connect to your mail server will not deliver mail to your mail server. This is a difficult situation to detect. You will simply stop receiving email from servers that comply with the policy. You might receive reports from them indicating what errors they encountered, but it is not guaranteed. Be sure that the email address you provide in SMTP TLS reporting (in the following section) is functional and monitored by people who can take action to fix issues. If you miss TLS failure reports, you probably won’t receive email. If the TLS certificate that you use on your email server expires, and your MTA-STS policy is set to enforce, this will become an urgent issue and will disrupt the flow of email until it is fixed.

SMTP TLS reporting

SMTP TLS reporting is described by the IETF in RFC 8460 as follows:

A number of protocols exist for establishing encrypted channels between SMTP Mail Transfer Agents (MTAs), including STARTTLS, DNS-Based Authentication of Named Entities (DANE) TLSA, and MTA Strict Transport Security (MTA-STS). These protocols can fail due to misconfiguration or active attack, leading to undelivered messages or delivery over unencrypted or unauthenticated channels. This document describes a reporting mechanism and format by which sending systems can share statistics and specific information about potential failures with recipient domains. Recipient domains can then use this information to both detect potential attacks and diagnose unintentional misconfigurations.

As you gain the security benefits of MTA-STS, SMTP TLS reporting will allow you to receive reports from other internet email providers. These reports contain information that is valuable when monitoring your TLS security posture, identifying problems, and learning about attacks that might be occurring.

This solution will deploy a valid SMTP TLS reporting record on Route 53 (or provide you with the value to publish in the DNS if you are not using Route 53).

To support SMTP TLS reporting, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _smtp._tls.<your-domain>. The value of this record is: v=TLSRPTv1; rua=mailto:<report-receiver-email-address>
  2. The value of <report-receiver-email-address> might be an address in your domain or in a third-party provider. Automated systems that process these reports must be capable of processing GZIP compressed files and parsing JSON.

Deploy the solution with the AWS CDK

In this section, you’ll learn how to deploy the solution to create the previously described AWS resources in your account.

  1. Clone the following GitHub repository:

    git clone https://github.com/aws-samples/serverless-mail
    cd serverless-mail/email-security-records

  2. Edit CONFIG.py to reflect your desired settings, as follows:
    1. If no Verified Mark Certificate is provided, set VMC_FILENAME = None.
    2. If your DNS zone is not hosted on Route 53, or if you do not want this app to manage Route 53 DNS records, set ROUTE_53_HOSTED = False. In this case, you will need to set TLS_CERTIFICATE_ARN to the Amazon Resource Name (ARN) of a certificate hosted on ACM in us-east-1. This certificate is used by CloudFront and must support two subdomains: mta-sts and your configured BIMI_ASSET_SUBDOMAIN.
  3. Finalize the preparation, as follows:
    1. Place your BIMI logo and Verified Mark Certificate files in the assets folder.
    2. Create an MTA-STS policy file at assets/.well-known/mta-sts.txt to reflect your mail exchange (MX) servers and policy requirements. An example file is provided at assets/.well-known/mta-sts.txt.example
  4. Deploy the solution, as follows:
    1. Open a terminal in the email-security-records folder.
    2. (Recommended) Create and activate a virtual environment by running the following commands.
      python3 -m venv .venv
      source .venv/bin/activate
    3. Install the Python requirements in your environment with the following command.
      pip install -r requirements.txt
    4. Assume a role in the target account that has the permissions outlined in the Prerequisites section of this post.

      Using AWS CDK version 2.17.0 or later, deploy the bootstrap in the target account by running the following command. To learn more, see Bootstrapping in the AWS CDK Developer Guide.
      cdk bootstrap

    5. Run the following command to synthesize the CloudFormation template. Review the output of this command to verify what will be deployed.
      cdk synth
    6. Run the following command to deploy the CloudFormation template. You will be prompted to accept the IAM changes that will be applied to your account.
      cdk deploy

      Note: If you use Route53, these records are created and activated in your DNS zones as soon as the CDK finishes deploying. As the records propagate through the DNS, they will gradually start affecting the email in the affected domains.

    7. If you’re not using Route53 and instead are using a third-party DNS provider, create the CNAME and TXT records as indicated. In this case, your email is not affected by this solution until you create the records in DNS.

Testing and troubleshooting

After you have deployed the CDK solution, you can test it to confirm that the DNS records and web resources are published correctly.


  1. Query the BIMI DNS TXT record for your domain by using the dig or nslookup command in your terminal.

    dig +short TXT default._bimi.<your-domain.example>

    Verify the response. For example:

    "v=BIMI1; l=https://bimi-assets.<your-domain.example>/logo.svg"

  2. In your web browser, open the URL from that response (for example, https://bimi-assets.<your-domain.example>/logo.svg) to verify that the logo is available and that the HTTPS certificate is valid.
  3. The BIMI group provides a tool to validate your BIMI configuration. This tool will also validate your VMC if you have purchased one.


  1. Query the MTA-STS DNS TXT record for your domain.

    dig +short TXT _mta-sts.<your-domain.example>

    The value of this record is as follows:

    v=STSv1; id=<unique value used for cache invalidation>

  2. You can load the MTA-STS policy document using your web browser. For example, https://mta-sts.<your-domain.example>/.well-known/mta-sts.txt
  3. You can also use third party tools to examine your MTA-STS configuration, such as MX Toolbox.

TLS reporting

  1. Query the TLS reporting DNS TXT record for your domain.

    dig +short TXT _smtp._tls.<your-domain.example>

    Verify the response. For example:

    "v=TLSRPTv1; rua=mailto:<your email address>"

  2. You can also use third party tools to examine your TLS reporting configuration, such as Easy DMARC.

Depending on which domains you communicate with on the internet, you will begin to see TLS reports arriving at the email address that you have defined in the TLS reporting DNS record. We recommend that you closely examine the TLS reports, and use automated analytical techniques over an extended period of time before changing the default testing value of your domain’s MTA-STS policy. Not every email provider will send TLS reports, but examining the reports in aggregate will give you a good perspective for making changes to your MTA-STS policy.


To remove the resources created by this solution:

  1. Open a terminal in the cdk-email-security-records folder.
  2. Assume a role in the target account with permission to delete resources.
  3. Run cdk destroy.

Note: The asset and log buckets are automatically emptied and deleted by the cdk destroy command.


When external systems send email to or receive email from your domains they will now query your new DNS records and will look up your domain’s BIMI, MTA-STS, and TLS reporting information from your new CloudFront distribution. By adopting the email domain security mechanisms outlined in this post, you can improve the overall security posture of your email environment, as well as the perception of your brand.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Michael Davie

Michael Davie

Michael is a Senior Industry Specialist with AWS Security Assurance. He works with our customers, their regulators, and AWS teams to help raise the bar on secure cloud adoption and usage. Michael has over 20 years of experience working in the defence, intelligence, and technology sectors in Canada and is a licensed professional engineer.

Jesse Thompson

Jesse Thompson

Jesse is an Email Deliverability Manager with the Amazon Simple Email Service team. His background is in enterprise IT development and operations, with a focus on email abuse mitigation and encouragement of authenticity practices with open standard protocols. Jesse’s favorite activity outside of technology is recreational curling.

Control access to Amazon OpenSearch Service Dashboards with attribute-based role mappings

Post Syndicated from Stefan Appel original https://aws.amazon.com/blogs/big-data/control-access-to-amazon-opensearch-service-dashboards-with-attribute-based-role-mappings/

Federated users of Amazon OpenSearch Service often need access to OpenSearch Dashboards with roles based on their user profiles. OpenSearch Service fine-grained access control maps authenticated users to OpenSearch Search roles and then evaluates permissions to determine how to handle the user’s actions. However, when an enterprise-wide identity provider (IdP) manages the users, the mapping of users to OpenSearch Service roles often needs to happen dynamically based on IdP user attributes. One option to map users is to use OpenSearch Service SAML integration and pass user group information to OpenSearch Service. Another option is Amazon Cognito role-based access control, which supports rule-based or token-based mappings. But neither approach supports arbitrary role mapping logic. For example, when you need to interpret multivalued user attributes to identify a target role.

This post shows how you can implement custom role mappings with an Amazon Cognito pre-token generation AWS Lambda trigger. For our example, we use a multivalued attribute provided over OpenID Connect (OIDC) to Amazon Cognito. We show how you are in full control of the mapping logic and process of such a multivalued attribute for AWS Identity and Access Management (IAM) role lookups. Our approach is generic for OIDC-compatible IdPs. To make this post self-contained, we use the Okta IdP as an example to walk through the setup.

Overview of solution

The provided solution intercepts the OICD-based login process to OpenSearch Dashboards with a pre-token generation Lambda function. The login to OpenSearch Dashboards with a third-party IdP and Amazon Cognito as an intermediary consists of several steps:

  1. First, the initial user request to OpenSearch Dashboard is redirected to Amazon Cognito.
  2. Amazon Cognito redirects the request to the IdP for authentication.
  3. After the user authenticates, the IdP sends the identity token (ID token) back to Amazon Cognito.
  4. Amazon Cognito invokes a Lambda function that modifies the obtained token. We use an Amazon DynamoDB table to perform role mapping lookups. The modified token now contains the IAM role mapping information.
  5. Amazon Cognito uses this role mapping information to map the user to the specified IAM role and provides the role credentials.
  6. OpenSearch Service maps the IAM role credentials to OpenSearch roles and applies fine-grained permission checks.

The following architecture outlines the login flow from a user’s perspective.

Scope of solution

On the backend, OpenSearch Dashboards integrates with an Amazon Cognito user pool and an Amazon Cognito identity pool during the authentication flow. The steps are as follows:

  1. Authenticate and get tokens.
  2. Look up the token attribute and IAM role mapping and overwrite the Amazon Cognito attribute.
  3. Exchange tokens for AWS credentials used by OpenSearch dashboards.

The following architecture shows this backend perspective to the authentication process.

Backend authentication flow

In the remainder of this post, we walk through the configurations necessary for an authentication flow in which a Lambda function implements custom role mapping logic. We provide sample Lambda code for the mapping of multivalued OIDC attributes to IAM roles based on a DynamoDB lookup table with the following structure.

OIDC Attribute Value IAM Role
["attribute_a","attribute_b"] arn:aws:iam::<aws-account-id>:role/<role-name-01>
["attribute_a","attribute_x"] arn:aws:iam::<aws-account-id>:role/<role-name-02>

The high-level steps of the solution presented in this post are as follows:

  1. Configure Amazon Cognito authentication for OpenSearch Dashboards.
  2. Add IAM roles for mappings to OpenSearch Service roles.
  3. Configure the Okta IdP.
  4. Add a third-party OIDC IdP to the Amazon Cognito user pool.
  5. Map IAM roles to OpenSearch Service roles.
  6. Create the DynamoDB attribute-role mapping table.
  7. Deploy and configure the pre-token generation Lambda function.
  8. Configure the pre-token generation Lambda trigger.
  9. Test the login to OpenSearch Dashboards.


For this walkthrough, you should have the following prerequisites:

  • An AWS account with an OpenSearch Service domain.
  • A third-party IdP that supports OpenID Connect and adds a multivalued attribute in the authorization token. For this post, we use attributes_array as this attribute’s name and Okta as an IdP provider. You can create an Okta Developer Edition free account to test the setup.

Configure Amazon Cognito authentication for OpenSearch Dashboards

The modification of authentication tokens requires you to configure the OpenSearch Service domain to use Amazon Cognito for authentication. For instructions, refer to Configuring Amazon Cognito authentication for OpenSearch Dashboards.

The Lambda function implements custom role mappings by setting the cognito:preferred_role claim (for more information, refer to Role-based access control). For the correct interpretation of this claim, set the Amazon Cognito identity pool to Choose role from token. The Amazon Cognito identity pool then uses the value of the cognito:preferred_role claim to select the correct IAM role. The following screenshot shows the required settings in the Amazon Cognito identity pool that is created during the configuration of Amazon Cognito authentication for OpenSearch Service.

Cognito role mapping configuration

Add IAM roles for mappings to OpenSearch roles

IAM roles used for mappings to OpenSearch roles require a trust policy so that authenticated users can assume them. The trust policy needs to reference the Amazon Cognito identity pool created during the configuration of Amazon Cognito authentication for OpenSearch Service. Create at least one IAM role with a custom trust policy. For instructions, refer to Creating a role using custom trust policies. The IAM role doesn’t require the attachment of a permission policy. For a sample trust policy, refer to Role-based access control.

Configure the Okta IdP

In this section, we describe the configuration steps to include a multivalued attribute_array attribute in the token provided by Okta. For more information, refer to Customize tokens returned from Okta with custom claims. We use the Okta UI to perform the configurations. Okta also provides an API that you can use to script and automate the setup.

The first step is adding the attributes_array attribute to the Okta user profile.

  1. Use Okta’s Profile Editor under Directory, Profile Editor.
  2. Select User (default) and then choose Add Attribute.
  3. Add an attribute with a display name and variable name attributes_array of type string array.

The following screenshot shows the Okta default user profile after the custom attribute has been added.

Okta user profile editor

  1. Next, add attributes_array attribute values to users using Okta’s user management interface under Directory, People.
  2. Select a user and choose Profile.
  3. Choose Edit and enter attribute values.

The following screenshot shows an example of attributes_array attribute values within a user profile.

Okta user attributes array

The next step is adding the attributes_array attribute to the ID token that is generated during the authentication process.

  1. On the Okta console, choose Security, API and select the default authorization server.
  2. Choose Claims and choose Add Claim to add the attributes_array attribute as part of the ID token.
  3. As the scope, enter openid and as the attribute value, enter user.attributes_array.

This references the previously created attribute in a user’s profile.

Add claim to ID token

  1. Next, create an application for the federation with Amazon Cognito. For instructions, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

The last step assigns the Okta application to Okta users.

  1. Navigate to Directory, People, select a user, and choose Assign Applications.
  2. Select the application you created in the previous step.

Add a third-party OIDC IdP to the Amazon Cognito user pool

We are implementing the role mapping based on the information provided in a multivalued OIDC attribute. The authentication token needs to include this attribute. If you followed the previously described Okta configuration, the attribute is automatically added to the ID token of a user. If you used another IdP, you might have to request the attribute explicitly. For this, add the attribute name to the Authorized scopes list of the IdP in Amazon Cognito.

For instructions on how to set up the federation between a third-party IdP and an Amazon Cognito user pool and how to request additional attributes, refer to Adding OIDC identity providers to a user pool. For a detailed walkthrough for Okta, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

After requesting the token via OIDC, you need to map the attribute to an Amazon Cognito user pool attribute. For instructions, refer to Specifying identity provider attribute mappings for your user pool. The following screenshot shows the resulting configuration on the Amazon Cognito console.

Amazon Cognito user pool attribute mapping

Map IAM roles to OpenSearch Service roles

Upon login, OpenSearch Service maps users to an OpenSearch Service role based on the IAM role ARN set in the cognito:preferred_role claim by the pre-token generation Lambda trigger. This requires a role mapping in OpenSearch Service. To add such role mappings to IAM backend roles, refer to Mapping roles to users. The following screenshot shows a role mapping on the OpenSearch Dashboards console.

Amazon OpenSearch Service role mappings

Create the attribute-role mapping table

For this solution, we use DynamoDB to store mappings of users to IAM roles. For instructions, refer to Create a table and define a partition key named Key of type String. You need the table name in the subsequent step to configure the Lambda function.

The next step is writing the mapping information into the table. A mapping entry consists of the following attributes:

  • Key – A string that contains attribute values in comma-separated alphabetical order
  • RoleArn – A string with the IAM role ARN to which the attribute value combination should be mapped

For details on how to add data to a DynamoDB table, refer to Write data to a table using the console or AWS CLI.

For example, if the previously configured OIDC attribute attributes_array contains three values, attribute_a, attribute_b, and attribute_c, the entry in the mapping table looks like table line 1 in the following screenshot.

Amazon DynamoDB table with attribute-role mappings

Deploy and configure the pre-token generation Lambda function

A Lambda function implements the custom role mapping logic. The Lambda function receives an Amazon Cognito event as input and extracts attribute information out of it. It uses the attribute information for a lookup in a DynamoDB table and retrieves the value for cognito:preferred_role. Follow the steps in Getting started with Lambda to create a Node.js Lambda function and insert the following source code:

const AWS = require("aws-sdk");
const tableName = process.env.TABLE_NAME;
const unauthorizedRoleArn = process.env.UNAUTHORIZED_ROLE;
const userAttributeArrayName = process.env.USER_POOL_ATTRIBUTE;
const dynamodbClient = new AWS.DynamoDB({apiVersion: "2012-08-10"});
exports.lambdaHandler = handlePreTokenGenerationEvent

async function handlePreTokenGenerationEvent (event, context) {
    var sortedAttributeList = getSortedAttributeList(event);
    var lookupKey = sortedAttributeList.join(',');
    var roleArn = await lookupIAMRoleArn(lookupKey);
    appendResponseWithPreferredRole(event, roleArn);
    return event;

function getSortedAttributeList(event) {
    return JSON.parse(event['request']['userAttributes'][userAttributeArrayName]).sort();

async function lookupIAMRoleArn(key) {
    var params = {
        TableName: tableName,
        Key: {
          'Key': {S: key}
        ProjectionExpression: 'RoleArn'
    try {
        let item = await dynamodbClient.getItem(params).promise();
        return item['Item']['RoleArn']['S'];
    } catch (e){
        return unauthorizedRoleArn; 

function appendResponseWithPreferredRole(event, roleArn){
    event.response = {
        'claimsOverrideDetails': {
            'groupOverrideDetails': {
                'preferredRole': roleArn

The Lambda function expects three environment variables. Refer to Using AWS Lambda environment variables for instructions to add the following entries:

  • TABLE_NAME – The name of the previously created DynamoDB table. This table is used for the lookups.
  • UNAUTHORIZED_ROLE – The ARN of the IAM role that is used when no mapping is found in the lookup table.
  • USER_POOL_ATTRIBUTE – The Amazon Cognito user pool attribute used for the IAM role lookup. In our example, this attribute is named custom:attributes_array.

The following screenshot shows the final configuration.

AWS Lamba function configuration

The Lambda function needs permissions to access the DynamoDB lookup table. Set permissions as follows: attach the following policy to the Lambda execution role (for instructions, refer to Lambda execution role) and provide the Region, AWS account number, and DynamoDB table name:

    "Statement": [
            "Action": [
            "Resource": [
            "Effect": "Allow"

The configuration of the Lambda function is now complete.

Configure the pre-token generation Lambda trigger

As final step, add a pre-token generation trigger to the Amazon Cognito user pool and reference the newly created Lambda function. For details, refer to Customizing user pool workflows with Lambda triggers. The following screenshot shows the configuration.

Amazon Cognito pre-token generation trigger configuration

This step completes the setup; Amazon Cognito now maps users to OpenSearch Service roles based on the values provided in an OIDC attribute.

Test the login to OpenSearch Dashboards

The following diagram shows an exemplary login flow and the corresponding screenshots for an Okta user user1 with a user profile attribute attribute_array and value: ["attribute_a", "attribute_b", "attribute_c"].

Testing of solution

Clean up

To avoid incurring future charges, delete the OpenSearch Service domain, Amazon Cognito user pool and identity pool, Lambda function, and DynamoDB table created as part of this post.


In this post, we demonstrated how to set up a custom mapping to OpenSearch Service roles using values provided via an OIDC attribute. We dynamically set the cognito:preferred_role claim using an Amazon Cognito pre-token generation Lambda trigger and a DynamoDB table for lookup. The solution is capable of handling dynamic multivalued user attributes, but you can extend it with further application logic that goes beyond a simple lookup. The steps in this post are a proof of concept. If you plan to develop this into a productive solution, we recommend implementing Okta and AWS security best practices.

The post highlights just one use case of how you can use Amazon Cognito support for Lambda triggers to implement custom authentication needs. If you’re interested in further details, refer to How to Use Cognito Pre-Token Generation trigger to Customize Claims In ID Tokens.

About the Authors

Portrait StefanStefan Appel is a Senior Solutions Architect at AWS. For 10+ years, he supports enterprise customers adopt cloud technologies. Before joining AWS, Stefan held positions in software architecture, product management, and IT operations departments. He began his career in research on event-based systems. In his spare time, he enjoys hiking and has walked the length of New Zealand following Te Araroa.

Portrait ModoodModood Alvi is Senior Solutions Architect at Amazon Web Services (AWS). Modood is passionate about digital transformation and is committed helping large enterprise customers across the globe accelerate their adoption of and migration to the cloud. Modood brings more than a decade of experience in software development, having held various technical roles within companies like SAP and Porsche Digital. Modood earned his Diploma in Computer Science from the University of Stuttgart.

Build highly available streams with Amazon Kinesis Data Streams

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/build-highly-available-streams-with-amazon-kinesis-data-streams/

Many use cases are moving towards a real-time data strategy due to demand for real-time insights, low-latency response times, and the ability to adapt to the changing needs of end-users. For this type of workload, you can use Amazon Kinesis Data Streams to seamlessly provision, store, write, and read data in a streaming fashion. With Kinesis Data Streams, there are no servers to manage, and you can scale your stream to handle any additional throughput as it comes in.

Kinesis Data Streams offers 99.9% availability in a single AWS Region. For even higher availability, there are several strategies to explore within the streaming layer. This post compares and contrasts different strategies for creating a highly available Kinesis data stream in case of service interruptions, delays, or outages in the primary Region of operation.

Considerations for high availability

Before we dive into our example use case, there are several considerations to keep in mind when designing a highly available Kinesis Data Streams workload that relate to the business need for a particular pipeline:

  • Recovery Time Objective (RTO) is defined by the organization. RTO is the maximum acceptable delay between the interruption of service and restoration of service. This determines what is considered an acceptable time window when service is unavailable.
  • Recovery Point Objective (RPO) is defined by the organization. RPO is the maximum acceptable amount of time since the last data recovery point. This determines what is considered an acceptable loss of data between the last recovery point and the interruption of service.

In a general sense, the lower your values for RPO and RTO, the more expensive the overall solution becomes. This is because the solution needs to account for minimizing both data loss and service unavailability by having multiple instances of the service up and running in multiple Regions. This is why a big piece of high availability is the replication of data flowing through a workload. In our case, the data is replicated across Regions of Kinesis Data Streams. Conversely, the higher the RPO and RTO values are, the more complexity you introduce into your failover mechanism. This is due to the fact that the cost savings you realize by not standing up multiple instances across multiple Regions are offset by the orchestration needed to spin up these instances in the event of an outage.

In this post, we are only covering failover of a Kinesis data stream. In use cases where higher availability is required across the entire data pipeline, having failover architectures for every component (Amazon API Gateway, AWS Lambda, Amazon DynamoDB) is strongly encouraged.

The simplest approach to high availability is to start a new instance of producers, consumers, and data streams in a new Region upon service unavailability detection. The benefit here is primarily cost, but your RPO and RTO values will be higher as a result.

We cover the following strategies for highly available Kinesis Data Streams:

  • Warm standy – An architecture in which there is active replication of data from the Kinesis data stream in Region A to Region B. Consumers of the data stream are running in both Regions at all times. Recommended for use cases that can’t withstand extended downtime past their replication lag.
  • Cold standby – Active replication of data from the data stream in Region A to Region B, but consumers of the data stream in Region B are spun up when an outage in Region A is detected. Recommended for use cases that can afford some downtime as infrastructure is spun up in secondary Region. In this scenario, RPO will be similar to the warm standby strategy; however, RTO will increase.

For high availability purposes, these use cases need to replicate the data across Regions in a way that allows consumers and producers of the data stream to fail over quickly upon detection of a service unavailability and utilize the secondary Region’s stream. Let’s take an example architecture to further explain these DR strategies. We use API Gateway and Lambda to publish stock ticker information to a Kinesis data stream. The data is then retrieved by another Lambda consumer to save durably into DynamoDB for querying, alerting, and reporting. The following diagram illustrates this architecture.

the primary architecture for the post--showcasing data coming from a mobile phone to API Gateway, then AWS Lambda, then Kinesis Data Streams, Lambda again and finally publishing to a DynamoDB Table

We use this architecture with an example use case requiring the streaming workload to be highly available in the event of a Region outage. The customer can withstand an RTO of 15 minutes during an outage, because they refresh end-users’ dashboards on a 15-minute interval. The customer is sensitive to downtime and data loss, because their data will be used for historical auditing purposes, operational metrics, and dashboards for end-users. Downtime for this customer means that data isn’t able to be persisted in their database from their streaming layer, and therefore unavailable to any consuming application. For this use case, data can be retried up to 5 minutes from our Lambda function before failing over to the new Region. Consumers are considered unavailable when the stream is unavailable, and can scale up in the secondary Region to account for any backlog of events.

How might we approach making a Kinesis data stream highly available for this use case?

Warm standby pattern

The following architecture diagram illustrates the warm standby high availability pattern for Kinesis Data Streams.

warm standby pattern showcasing data being replicated between a kinesis data stream in one region to another

image showcasing the warm standby failover--where data from first lambda begins replicating to secondary region KDA

The warm standby architectural pattern involves running a Kinesis data stream both in the primary and secondary Region, along with consumers and downstream destinations of the primary Region’s streaming layer being replicated as well. Sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. We dive into details of how to achieve this in the client failover section of this post. Data is replicated across Regions from the data stream in the primary Region to the secondary Region. This is done instead of having the sources publish to both Regions to avoid any consistency issues between the streams in the two Regions.

Although this architectural pattern gives very high availability, it’s also the most expensive option because we’re duplicating virtually the entire streaming layer across two Regions. For business use cases that can’t withstand extended data loss or withstand downtime, this may be the best option for them. From an RTO perspective, this architectural pattern ensures there will be no downtime. There is some nuance in the RPO metric in that it depends heavily on the replication lag. In the event of the primary stream becoming unavailable, whatever data hasn’t yet been replicated may be unavailable in the secondary Region. This data won’t be considered lost, but may be unavailable for consumption until the primary stream becomes available again. This method also can result in events being out of order.

For business needs that can’t tolerate this level of record unavailability, consider retaining data on the producer for the purposes of publishing to an available stream when available, or rewinding against the source for the producer if possible so that data stuck in the primary Region can be resent to the secondary stream upon failover. We cover this consideration in the client failover section of this post.

Cold standby pattern

The following architecture diagram illustrates the cold standby high availability pattern for Kinesis Data Streams.

active passive pattern for kinesis data streams

The cold standby architectural pattern involves running a data stream both in the primary and secondary Region, and spinning up the downstream resources like a stream consumer and destination for streams when a service interruption is detected—passive mode. Just like the warm standby pattern, sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. Likewise, data is replicated across Regions from the data stream in the primary Region to the secondary Region.

The primary benefit this architectural pattern provides is cost efficiency. By not running consumers at all times, this effectively reduces your costs significantly compared to the warm standby pattern. However, this pattern may introduce some data unavailability for downstream systems while the secondary Region infrastructure is provisioned. Additionally, depending on replication lag, some records may be unavailable, as discussed in the warm standby pattern. It should be noted that depending on how long it takes to spin up resources, it may take some time for consumers to reprocess the data in the secondary Region, and latency can be introduced when failing over. Our implementation assumes a minimal replication lag and that downstream systems have the ability to reprocess a configurable amount of data to catch up to the tip of the stream. We discuss approaches to spinning these resources up in the client failover section, but one possible approach to this would be using an AWS CloudFormation template that spins these resources up on service unavailability detection.

For business needs that can tolerate some level of data unavailability and can accept interruptions while the new infrastructure in the secondary Region is spun up, this is an option to consider both from a cost perspective and an RPO/RTO perspective. The complexity of spinning up resources upon detecting service unavailability is offset by the lower cost of the overall solution.

Which pattern makes sense for our use case?

Let’s revisit the use case described earlier to identify which of the strategies best meets our needs. We can extract the pieces of information from the customer’s problem statement to identify that they need a high availability architecture that:

  • Can’t withstand extended amounts of data loss
  • Must resume operations within 15 minutes of service interruption identification

This criterion tells us that their RPO is close to zero, and their RTO is 15 minutes. From here, we can determine that the cold standby architecture with data replication provides us limited data loss, and the maximum downtime will be determined by the time it takes to provision consumers and downstream destinations in the secondary Region.

Let’s dive deeper into the implementation details of each of the core phases of high availability, including an implementation guide for our use case.

Launch AWS CloudFormation resources

If you want to follow along with our code samples, you can launch the following CloudFormation stack and follow the instructions in order to simulate the cold standby architecture referenced in this post.

Launch Stack

For purposes of the Kinesis Data Streams high availability setup demo, we use us-west-2 as the primary Region and us-east-2 as the failover Region. While deploying this solution in your own account, you can choose your own primary and failover Regions.

  1. Deploy the supplied CloudFormation template in failover Region us-east-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

  1. Deploy the supplied CloudFormation template in primary Region us-west-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

In steps 1 and 2, we deployed the following resources in the primary and failover Regions:

  1. KDS-HA-Stream – AWS::Kinesis::Stream (primary and failover Region)
  2. KDS-HA-ProducerLambda – AWS::Lambda::Function (primary Region)
  3. KDS-HA-ConsumerLambda – AWS::Lambda::Function (primary and failover Region)
  4. KDS-HA-ReplicationAgentLambda – AWS::Lambda::Function (primary Region)
  5. KDS-HA-FailoverLambda – AWS::Lambda::Function (primary Region)
  6. ticker-prices – AWS::DynamoDB::GlobalTable (primary and failover Region)

The KDS-HA-Stream Kinesis data stream is deployed in both Regions. An enhanced fan-out consumer of the KDS-HA-Stream stream KDS-HA-ReplicationAgentLambda in the primary Region is responsible for replicating messages to the data stream in the failover Region.

KDS-HA-ConsumerLambda is a Lambda function consuming messages out of the KDS-HA-Stream stream and persisting data into a DynamoDB table after preprocessing.

You can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that last_updated_region attribute shows us-west-2 as its value because it’s the primary Region.


When deciding how to replicate data from a data stream in Region A to a data stream in Region B, there are several strategies that involve a consumer reading data off of the primary stream and sending that data cross-Region to the secondary data stream. This would act as a replicator service, responsible for copying the data between the two streams, maintaining a relatively low latency to replicate and ensuring data isn’t lost during this replication.

Because replication off of a shared throughput data stream could impact the flow of data in a production workload, we recommend using the enhanced fan-out feature of Kinesis Data Streams consumers to ensure replication doesn’t have an impact on consumption latency.

The replication strategy implemented in this post features asynchronous replication, meaning that the replication process doesn’t block any standard data flow in the primary stream. Synchronous replication would be a safer approach to guarantee replication and avoid data loss; however, this isn’t possible without a service-side implementation.

The following image shows a timeline of data flow for the cold standby architecture, with data being replicated as soon as it’s published.

Lambda replication

Lambda can treat a Kinesis data stream as an event source, which will funnel events from your data stream into a Lambda function. This Lambda function then receives and forwards these events across Regions to your data stream in a secondary Region. Lambda functions allow you to utilize best streaming practices such as retries of records that encounter errors, bisect on error functionality, and using the Lambda parallelization factor; using more instances of your Lambda function than you have available shards can help process records faster.

This Lambda function is at the crux of the architecture for high availability; it’s responsible solely for sending data across Regions, and it also has the best capability to monitor the replication progress. Important metrics to monitor for Lambda replication include IteratorAge, which indicates how old the last record in the batch was when it finished processing. A high IteratorAge value indicates that the Lambda function is falling behind and therefore is not keeping up with data ingestion for replication purposes. A high IteratorAge can lead to a higher RPO and the higher likelihood of data unavailability when a passive failover happens.

We use the following sample Lambda function in our CloudFormation template to replicate data across Regions:

import json
import boto3
import random
import os
import base64

def lambda_handler(event, context):
    client = boto3.client("kinesis", region_name=os.environ["FAILOVER_REGION"])
    records = []

    for record in event["Records"]:
                "PartitionKey": record["kinesis"]["partitionKey"],
                "Data": base64.b64decode(record["kinesis"]["data"]).decode("utf-8"),
    response = client.put_records(Records=records, StreamName="KDS-HA-Stream")
    if response["FailedRecordCount"] > 0:
        print("Failed replicating data: " + json.dumps(response))
        raise Exception("Failed replicating data!")

The Lambda replicator in the CloudFormation template is configured to read from the data stream in the primary Region.

The following code contains the necessary AWS Identity and Access Management (IAM) permissions for Lambda, giving access for the Lambda function to assume this role. All actions are permitted on data streams and DynamoDB. In the principal of least privilege, it’s recommended to restrict this to the necessary streams in a production environment.

        Version: 2012-10-17
          - Effect: Allow
                - lambda.amazonaws.com
              - 'sts:AssumeRole'
      Path: /
        - PolicyName: root
            Version: 2012-10-17
              - Effect: Allow
                  - 'kinesis:DescribeStream'
                  - 'kinesis:DescribeStreamSummary'
                  - 'kinesis:GetRecords'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:ListShards'
                  - 'kinesis:ListStreams'
                  - 'kinesis:SubscribeToShard'
                  - 'kinesis:PutRecords'
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream'
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream/consumer/KDS-HA-Stream-EFO-Consumer:*'
        - 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'

Health check

A generalized strategy for determining when to consider our data stream unavailable involves the use of Amazon CloudWatch metrics. We use the metrics coming off of our Lambda producer and consumer in order to assess the availability of our data stream. While producing to a data stream, an error might appear as one of the following responses back from the data stream: PutRecord or PutRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503 error. When consuming from a data stream, an error might appear as one of the following responses back from the data stream: SubscribeToShard.Success or GetRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503.

We can calculate our effective error rate based on PutRecord.Success and GetRecord.Success. An average error rate of 1% or higher over a time window of 5 minutes, for example, could indicate that there is an issue with the data stream, and we may want to fail over. In our CloudFormation template, this error rate threshold as well as time window are configurable, but by default we check for an error rate of 1% in the last 5 minutes to trigger a failover of our clients.

Client failover

When a data stream is deemed to be unreachable, we must now take action to keep our system available and reachable for clients on both ends of the interaction. This means for producers following the cold standby high availability architecture, we change the destination stream where the producer was writing. If high availability and failover of data producers and consumers isn’t a requirement of a given use case, a different architecture would be a better fit.

Prior to failover, the producer may have been delivering data to a stream in Region A, but we now automatically update the destination to be the stream in Region B. For different clients, the methodology of updating the producer will be different, but for ours, we store the active destination for producers in the Lambda environment variables from AWS CloudFormation and update our Lambda functions dynamically on health check failure scenarios.

For our use case, we use the maximum consumer lag time (iteratorAge) plus some buffer to influence the starting position of the failover consumer. This allows us to ensure that the consumer in the secondary Region doesn’t skip records that haven’t been processed in the originating Region, but some data overlap may occur. Note that some duplicates in the downstream system may be introduced, and having an idempotent sink or some method of handling duplicates must be implemented in order to avoid duplicate-related isssues.

In the case where data is successfully written to a data stream but is unable to be consumed from the stream, the data will not be replicated and therefore be unavailable in the second Region. The data will be durably stored in the primary data stream until it comes back online and can be read from. Note that if the stream is unavailable for a longer period of time than your total data retention period on the data stream, this data will be lost. Data retention for Kinesis Data Streams can be retrospectively increased up to 1 year.

For consumers in a cold standby architecture, upon failure detection, the consumer will be disabled or shut down, and the same consumer instance will be spun up in the secondary Region to consume from the secondary data stream. On the consumer side, we assume that the consumer application is stateless in our provided solution. If your application requires state, you can migrate or preload the application state via Amazon Simple Storage Service (Amazon S3) or a database. For a stateless application, the most important aspect of failover is the starting position.

In the following timeline, we can see that at some point, the stream in Region A was deemed unreachable.

The consumer application in Region A was reading data at time t10, and when it fails over to the secondary Region (B), it reads starting at t5 (5 minutes before the current iteratorAgeMilliseconds). This ensures that data isn’t skipped by the consumer application. Keep in mind that there may be some overlap in records in the downstream destinations.

In the provided cold standby AWS CloudFormation example, we can manually trigger a failover with the AWS Command Line Interface (AWS CLI). In the following code, we manually fail over to us-east-2:

aws lambda invoke --function-name KDS-HA-FailoverLambda --cli-binary-format raw-in-base64-out --payload '{}' response.json --region us-west-2

After a few minutes, you can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that the last_updated_region attribute shows us-east-2 as its value because it’s failed over to the us-east-2 Region.


After an outage or service unavailability is deemed to be resolved, the next logical step is to reorient your clients back to their original operating Regions. Although it may be tempting to automate this procedure, a manual failback approach during off-business hours when minimal production disruption will take place makes more sense.

In the following images, we can visualize the timeline with which consumer applications are failed back to the original Region.

The producer switches back to the original Region, and we wait for the consumer in Region B to reach 0 lag. At this point, the consumer application in Region B is disabled, and replication to Region B is resumed. We have now returned to our normal state of processing messages as shown in the replication section of this post.

In our AWS CloudFormation setup, we perform a failback with the following steps:

  1. Re-enable the event source mapping and start consuming messages from the primary Region at the latest position:
aws lambda create-event-source-mapping --function-name KDS-HA-ConsumerLambda --batch-size 100 --event-source-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --starting-position LATEST --region us-west-2
  1. Switch the producer back to the primary Region:
aws lambda update-function-configuration --function-name KDS-HA-ProducerLambda --environment "Variables={INPUT_STREAM=KDS-HA-Stream,PRODUCING_TO_REGION=us-west-2}" --region us-west-2
  1. In the failover Region (us-east-2), wait for your data stream’s GetRecords max iterator age (in milliseconds) CloudWatch metric to report 0 as a value. We’re waiting for the consumer Lambda function to catch up with all produced messages.
  2. Stop consuming messages from the failover Region.
  3. Run the following AWS CLI command and grab the UUID from the response, which we use to delete the existing event source mapping. Make sure you’re picking event source mapping for the Lambda function KDS-HA-ConsumerLambda.
aws lambda list-event-source-mappings --region us-east-2
aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-east-2
  1. Restart the replication agent in the primary Region.
  2. Run following AWS CLI command, and capture ConsumerARN from the response:
aws kinesis list-stream-consumers --stream-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --region us-west-2
aws lambda create-event-source-mapping --function-name KDS-HA-ReplicationAgentLambda --batch-size 100 --event-source-arn {{ConsumerARN}} --starting-position LATEST --region us-west-2

When this is complete, you can observe the same data stream metrics—the number of records in and out per second, consumer lag metrics, and number of errors as described in the health check section of this post—to ensure that each of the components has resumed processing data in the original Region. We can also take note of the data landing in DynamoDB, which displays which Region data is being updated from in order to determine the success of our failback procedure.

We recommend for any streaming workload that can’t withstand extended data loss or downtime to implement some form of cross-Region high availability in the unlikely event of service unavailability. These recommendations can help you determine which pattern is right for your use case.

Clean up

To avoid incurring future charges, complete the following steps:

  1. Delete the CloudFormation stack from primary Region us-west-2.
  2. Delete the CloudFormation stack from failover Region us-east-2.
  3. List all event source mappings in primary Region us-west-2 using the aws lambda list-event-source-mappings --region us-west-2 command and note the UUIDs of the event source mappings tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions.
  4. Delete event source mappings in primary Region us-west-2 tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions using the aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-west-2 command and UUIDs noted in the previous step.


Building highly available Kinesis data streams across multiple Regions is multi-faceted, and all aspects of your RPO, RTO, and operational costs need to be carefully considered. The code and architecture discussed in this post is one of many different architectural patterns you can choose for your workloads, so make sure to choose the appropriate architecture based on the criteria for your specific requirements.

To learn more about Kinesis Data Streams, we have a getting started guide as well as a workshop to walk through all the integrations with Kinesis Data Streams. You can also contact your AWS Solutions Architects, who can be of assistance alongside your high availability journey.

About the Authors

Jeremy Ber has been working in the telemetry data space for the past 7 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day, and process complex machine learning algorithms in real time. At AWS, he is a Senior Streaming Specialist Solutions Architect supporting both Amazon MSK and Amazon Kinesis.

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices, and proactively helps keep customer’s AWS environments operationally healthy.

Run a popular benchmark on Amazon Redshift Serverless easily with AWS Data Exchange

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/run-a-popular-benchmark-on-amazon-redshift-serverless-easily-with-aws-data-exchange/

Amazon Redshift is a fast, easy, secure, and economical cloud data warehousing service designed for analytics. AWS announced Amazon Redshift Serverless general availability in July 2022, providing an easier experience to operate Amazon Redshift. Amazon Redshift Serverless makes it simple to run and scale analytics without having to manage your data warehouse infrastructure. Amazon Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use.

Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access external data in open file formats like CSV and Parquet stored in Amazon S3. For more information on RPU pricing, refer to Amazon Redshift pricing.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. With AWS Data Exchange for Amazon Redshift, customers can start querying, evaluating, analyzing, and joining third-party data with their own first-party data without requiring any extracting, transforming, and loading (ETL). Data providers can list and offer products containing Amazon Redshift datasets in the AWS Data Exchange catalog, granting subscribers direct, read-only access to the data stored in Amazon Redshift. This feature empowers customers to quickly query, analyze, and build applications with these third-party datasets.

TPC-DS is a commonly used benchmark for measuring the query performance of data warehouse solutions such as Amazon Redshift. The benchmark is useful in proving the query capabilities of executing simple to complex queries in a timely manner. It is also used to measure the performance of different database configurations, different concurrent workloads, and also against other database products.

This blog post walks you through the steps you’ll need to set up Amazon Redshift Serverless and run the SQL queries derived from the TPC-DS benchmark against data from the AWS Data Exchange.

Solution overview

We will get started by creating an Amazon Redshift Serverless workgroup and namespace. A namespace is a collection of database objects and users while a workgroup is a collection of compute resources. To simplify executing the benchmark queries, a Linux EC2 instance will also be deployed.

Next, a GitHub repo containing the TPC-DS derived queries will be used. The TPC-DS benchmark is frequently used for evaluating the performance and functionality of cloud data warehouses. The TPC-DS benchmark includes additional steps and requirements to be considered official, but for this blog post, we are focused on only executing the SQL SELECT queries from this benchmark.

The last component of the solution is data. The TPC-DS benchmark includes binaries for generating data, but this is time-consuming to run. We have avoided this problem by generating the data, and we have made this available freely in the AWS Data Exchange.

Automated setup: CloudFormation


Click the Launch Stack link above to launch the CloudFormation stack, which will automate the deployment of resources needed for the demo. The template deploys the following resources in your default VPC:

  • Amazon Compute Cloud (Amazon EC2) instance with the latest version of Amazon Linux
  • Amazon Redshift Serverless workgroup and namespace
  • IAM role with redshift-serverless:GetWorkgroup action granted; this is attached to the EC2 instance so that a command line interface (CLI) command can run to complete the instance configuration
  • Security group with inbound port 22 (ssh) and connectivity between the EC2 instance and the Amazon Redshift Serverless workgroup
  • The GitHub repo is downloaded in the EC2 instance

Template parameters

  • Stack: CloudFormation term used to define all of the resources created by the template.
  • KeyName: This is the name of an existing key pair. If you don’t have one already, create a key pair that is used to connect by SSH to the EC2 instance. More information on key pairs.
  • SSHLocation: The CIDR mask for incoming connections to the EC2 instance. The default is, which means any IP address is allowed to connect by SSH to the EC2 instance, provided the client has the key pair private key. The best practice for security is to limit this to a smaller range of IP addresses. For example, you can use sites like www.whatismyip.com to get your IP address.
  • RedshiftCapacity: This is the number of RPUs for the Amazon Redshift Serverless workgroup. The default is 128 and is recommended for analyzing the larger TPC-DS datasets. You can update the RPU capacity after deployment if you like or redeploy it with a different capacity value.

Manual setup

If you choose not to use the CloudFormation template, deploy the EC2 instance and Amazon Redshift Serverless with the following instructions. The following steps are only needed if you are manually provisioning the resources rather than using the provided CloudFormation template.

Amazon Redshift setup

Here are the high-level steps to create an Amazon Redshift Serverless workgroup. You can get more detailed information from this News Blog post.

To create your workgroup, complete the following steps:

  1. On the Amazon Redshift console, navigate to the Amazon Redshift Serverless dashboard.
  2. Choose Create workgroup.
  3. For Workgroup name, enter a name.
  4. Choose Next.
  5. For Namespace, enter a unique name.
  6. Choose Next.
  7. Choose Create.

These steps create an Amazon Redshift Serverless workgroup with 128 RPUs. This is the default; you can easily adjust this up or down based on your workload and budget constraints.

Linux EC2 instance setup

  • Deploy a virtual machine in the same AWS Region as your Amazon Redshift database using the Amazon Linux 2 AMI.
  • The Amazon Linux 2 AMI (64-bit x86) with the t2.micro instance type is an inexpensive and tested configuration.
  • Add the security group configured for your Amazon Redshift database to your EC2 instance.
  • Install psql with sudo yum install postgresql.x86_64 -y
  • Download this GitHub repo.
    git clone --depth 1 https://github.com/aws-samples/redshift-benchmarks /home/ec2-user/redshift-benchmarks

  • Set the following environment variables for Amazon Redshift:
    • PGHOST: This is the endpoint for the Amazon Redshift database.
    • PGPORT: This is the port the database listens on. The Amazon Redshift default is 5439.
    • PGUSER: This is your Amazon Redshift database user.
    • PGDATABASE: This is the database name where your external schemas are created. This is NOT the database created for the data share. We suggest using the default “dev” database.


export PGUSER="awsuser" 
export PGHOST="default.01.us-east-1.redshift-serverless.amazonaws.com" 
export PGPORT="5439" 
export PGDATABASE="dev"

Configure the .pgpass file to store your database credentials. The format for the .pgpass file is: hostname:port:database:user:password


default.01.us-east-1.redshift-serverless.amazonaws.com:5439:*:user1:[email protected]

AWS Data Exchange setup

AWS Data Exchange provides third-party data in multiple data formats, including Amazon Redshift. You can subscribe to catalog listings in multiple storage locations like Amazon S3 and Amazon Redshift data shares. We encourage you to explore the AWS Data Exchange catalog on your own because there are many datasets available that can be used to enhance your data in Amazon Redshift.

First, subscribe to the AWS Marketplace listing for TPC-DS Benchmark Data. Select the Continue to subscribe button from the AWS Data Exchange catalog listing. After you review the offer and Terms and Conditions of the data product, choose Subscribe. Note that you will need the appropriate IAM permissions to subscribe to AWS Data Exchange on Marketplace. More information can be found at AWS managed policies for AWS Data Exchange.

TPC-DS uses 24 tables in a dimensional model that simulates a decision support system. It has store, catalog, and web sales as well as store, catalog, and web returns fact tables. It also has the dimension tables to support these fact tables.

TPC-DS includes a utility to generate data for the benchmark at a given scale factor. The smallest scale factor is 1 GB (uncompressed). Most benchmark tests for cloud warehouses are run with 3–10 TB of data because the dataset is large enough to stress the system but also small enough to complete the entire test in a reasonable amount of time.

There are six database schemas provided in the TPC-DS Benchmark Data subscription with 1; 10; 100; 1,000; 3,000; and 10,000 scale factors. The scale factor refers to the uncompressed data size measured in GB. Each schema refers to a dataset with the corresponding scale factor.

Scale factor (GB) ADX schema Amazon Redshift Serverless external schema
1 tpcds1 ext_tpcds1
10 tpcds10 ext_tpcds10
100 tpcds100 ext_tpcds100
1,000 tpcds1000 ext_tpcds1000
3,000 tpcds3000 ext_tpcds3000
10,000 tpcds10000 ext_tpcds10000

The following steps will create external schemas in your Amazon Redshift Serverless database that maps to schemas found in the AWS Data Exchange.

  • Log in to the EC2 instance and create a database connection.

  • Run the following query:
    select share_name, producer_account, producer_namespace from svv_datashares;

  • Use the output of this query to run the next command:
    create database tpcds_db from datashare <share_name> of account '<producer_account>' namespace '<producer_namespace>';

  • Last, you create the external schemas in Amazon Redshift:
    create external schema ext_tpcds1 from redshift database tpcds_db schema tpcds1;
    create external schema ext_tpcds10 from redshift database tpcds_db schema tpcds10;
    create external schema ext_tpcds100 from redshift database tpcds_db schema tpcds100;
    create external schema ext_tpcds1000 from redshift database tpcds_db schema tpcds1000;
    create external schema ext_tpcds3000 from redshift database tpcds_db schema tpcds3000;
    create external schema ext_tpcds10000 from redshift database tpcds_db schema tpcds10000;

  • You can now exit psql with this command:

TPC-DS derived benchmark

The TPC-DS derived benchmark consists of 99 queries in four broad categories:

  • Reporting queries
  • Ad hoc queries
  • Iterative OLAP queries
  • Data mining queries

In addition to running the 99 queries, the benchmark tests concurrency. During the concurrency portion of the test, there are n sessions (default of 5) that run the queries. Each session runs the 99 queries with different parameters and in slightly different order. This concurrency test stresses the resources of the database and generally takes longer to complete than just a single session running the 99 queries.

Some data warehouse products are configured to optimize single-user performance, whereas others may not have the ability to manage the workload effectively. This is a great way to demonstrate the workload management and stability of Amazon Redshift.

Since the data for each scale factor is located in a different schema, running the benchmark against each scale factor requires changing the schema you are referencing. The search_path defines which schemas to search for tables when a query contains objects without a schema included. For example:

ALTER USER <username> SET search_path=ext_tpcds3000,public;

The benchmark scripts set the search_path automatically.

Note: The scripts create a schema called tpcds_reports which will store the detailed output of each step of the benchmark. Each time the scripts are run, this schema will be recreated, and the latest results will be stored. If you happen to already have a schema named tpcds_reports, these scripts will drop the schema.

Running the TPC-DS derived queries

  • Connect by SSH to the EC2 instance with your key pair.
  • Change directory:
    cd ~/redshift-benchmarks/adx-tpc-ds/

  • Optionally configure the variables for the scripts in tpcds_variables.sh

Here are the default values for the variables you can set:

  • EXT_SCHEMA="ext_tpcds3000": This is the name of the external schema created that has the TPC-DS dataset. The “3000” value means the scale factor is 3000 or 3 TB (uncompressed).
  • EXPLAIN="false": If set to false, queries will run. If set to true, queries will generate explain plans rather than actually running. Each query will be logged in the log directory. Default is false.
  • MULTI_USER_COUNT="5": 0 to 20 concurrent users will run the queries. The order of the queries was set with dsqgen. Setting to 0 will skip the multi-user test. Default is 5.
  • Run the benchmark:
    ./rollout.sh > rollout.log 2>&1 &

TPC-DS derived benchmark results

We performed a test with the 3 TB ADX TPC-DS dataset on an Amazon Redshift Serverless workgroup with 128 RPUs. Additionally, we disabled query caching so that query results aren’t cached. This allows us to measure the performance of the database as opposed to its ability to serve results from cache.

The test comprises two sections. The first will run the 99 queries serially using one user while the second section will start multiple sessions based on the configuration file you set earlier. Each session will run concurrently, and each will run all 99 queries but in a different order.

The total runtime for the single-user queries was 15 minutes and 11 seconds. As shown in the following graph, the longest-running query was query 67, with an elapsed time of only 101 seconds. The average runtime was only 9.2 seconds.

1 Session TPC-DS 3TB 128 RPUs

With five concurrent users, the runtime was 28 minutes and 35 seconds, which demonstrates how Amazon Redshift Serverless performs well for single-user and concurrent-user workloads.

5 Concurrent Sessions TPC-DS 3TB 128 RPUs

As you can see, it was pretty easy to deploy Amazon Redshift Serverless, subscribe to an AWS Data Exchange product listing, and run a fairly complex benchmark in a short amount of time.

Next steps

You can run the benchmark scripts again but with different dataset sizes or a different number of concurrent users by editing the tpcds_variables.sh file. You can also try resizing your Amazon Redshift Serverless workgroup to see the performance difference with more or fewer RPUs. You can also run individual queries to see the results firsthand.

Another thing to try is to subscribe to other AWS Data Exchange products and query this data from Amazon Redshift Serverless. Be curious and explore using Amazon Redshift Serverless and the AWS Data Exchange!

Clean up

If you deployed the resources with the automated solution, you just need to delete the stack created in CloudFormation. All resources created by the stack will be deleted automatically.

If you deployed the resources manually, you need to delete the following:

  • The Amazon Redshift database created earlier.
    • If you deployed Amazon Redshift Serverless, you will need to delete both the workgroup and the namespace.
  • The Amazon EC2 instance.

Optionally, you can unsubscribe from the TPC-DS data by going to your AWS Data Exchange Subscriptions and then turning Renewal to Off.


This blog post covered deploying Amazon Redshift Serverless, subscribing to an AWS Data Exchange product, and running a complex benchmark in a short amount of time. Amazon Redshift Serverless can handle high levels of concurrency with very little effort and excels in price-performance.

If you have any questions or feedback, please leave them in the comments section.

About the author

Jon RobertsJon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.


In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.


In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.

About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Unlock the power of EC2 Graviton with GitLab CI/CD and EKS Runners

Post Syndicated from Michael Fischer original https://aws.amazon.com/blogs/devops/unlock-the-power-of-ec2-graviton-with-gitlab-ci-cd-and-eks-runners/

Many AWS customers are using GitLab for their DevOps needs, including source control, and continuous integration and continuous delivery (CI/CD). Many of our customers are using GitLab SaaS (the hosted edition), while others are using GitLab Self-managed to meet their security and compliance requirements.

Customers can easily add runners to their GitLab instance to perform various CI/CD jobs. These jobs include compiling source code, building software packages or container images, performing unit and integration testing, etc.—even all the way to production deployment. For the SaaS edition, GitLab offers hosted runners, and customers can provide their own runners as well. Customers who run GitLab Self-managed must provide their own runners.

In this post, we’ll discuss how customers can maximize their CI/CD capabilities by managing their GitLab runner and executor fleet with Amazon Elastic Kubernetes Service (Amazon EKS). We’ll leverage both x86 and Graviton runners, allowing customers for the first time to build and test their applications both on x86 and on AWS Graviton, our most powerful, cost-effective, and sustainable instance family. In keeping with AWS’s philosophy of “pay only for what you use,” we’ll keep our Amazon Elastic Compute Cloud (Amazon EC2) instances as small as possible, and launch ephemeral runners on Spot instances. We’ll demonstrate building and testing a simple demo application on both architectures. Finally, we’ll build and deliver a multi-architecture container image that can run on Amazon EC2 instances or AWS Fargate, both on x86 and Graviton.

Figure 1. Managed GitLab runner architecture overview

Figure 1.  Managed GitLab runner architecture overview.

Let’s go through the components:


A runner is an application to which GitLab sends jobs that are defined in a CI/CD pipeline. The runner receives jobs from GitLab and executes them—either by itself, or by passing it to an executor (we’ll visit the executor in the next section).

In our design, we’ll be using a pair of self-hosted runners. One runner will accept jobs for the x86 CPU architecture, and the other will accept jobs for the arm64 (Graviton) CPU architecture. To help us route our jobs to the proper runner, we’ll apply some tags to each runner indicating the architecture for which it will be responsible. We’ll tag the x86 runner with x86, x86-64, and amd64, thereby reflecting the most common nicknames for the architecture, and we’ll tag the arm64 runner with arm64.

Currently, these runners must always be running so that they can receive jobs as they are created. Our runners only require a small amount of memory and CPU, so that we can run them on small EC2 instances to minimize cost. These include t4g.micro for Graviton builds, or t3.micro or t3a.micro for x86 builds.

To save money on these runners, consider purchasing a Savings Plan or Reserved Instances for them. Savings Plans and Reserved Instances can save you up to 72% over on-demand pricing, and there’s no minimum spend required to use them.

Kubernetes executors

In GitLab CI/CD, the executor’s job is to perform the actual build. The runner can create hundreds or thousands of executors as needed to meet current demand, subject to the concurrency limits that you specify. Executors are created only when needed, and they are ephemeral: once a job has finished running on an executor, the runner will terminate it.

In our design, we’ll use the Kubernetes executor that’s built into the GitLab runner. The Kubernetes executor simply schedules a new pod to run each job. Once the job completes, the pod terminates, thereby freeing the node to run other jobs.

The Kubernetes executor is highly customizable. We’ll configure each runner with a nodeSelector that makes sure that the jobs are scheduled only onto nodes that are running the specified CPU architecture. Other possible customizations include CPU and memory reservations, node and pod tolerations, service accounts, volume mounts, and much more.

Scaling worker nodes

For most customers, CI/CD jobs aren’t likely to be running all of the time. To save cost, we only want to run worker nodes when there’s a job to run.

To make this happen, we’ll turn to Karpenter. Karpenter provisions EC2 instances as soon as needed to fit newly-scheduled pods. If a new executor pod is scheduled, and there isn’t a qualified instance with enough capacity remaining on it, then Karpenter will quickly and automatically launch a new instance to fit the pod. Karpenter will also periodically scan the cluster and terminate idle nodes, thereby saving on costs. Karpenter can terminate a vacant node in as little as 30 seconds.

Karpenter can launch either Amazon EC2 on-demand or Spot instances depending on your needs. With Spot instances, you can save up to 90% over on-demand instance prices. Since CI/CD jobs often aren’t time-sensitive, Spot instances can be an excellent choice for GitLab execution pods. Karpenter will even automatically find the best Spot instance type to speed up the time it takes to launch an instance and minimize the likelihood of job interruption.

Deploying our solution

To deploy our solution, we’ll write a small application using the AWS Cloud Development Kit (AWS CDK) and the EKS Blueprints library. AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. EKS Blueprints is a library designed to make it simple to deploy complex Kubernetes resources to an Amazon EKS cluster with minimum coding.

The high-level infrastructure code – which can be found in our GitLab repo – is very simple. I’ve included comments to explain how it works.

// All CDK applications start with a new cdk.App object.
const app = new cdk.App();

// Create a new EKS cluster at v1.23. Run all non-DaemonSet pods in the 
// `kube-system` (coredns, etc.) and `karpenter` namespaces in Fargate
// so that we don't have to maintain EC2 instances for them.
const clusterProvider = new blueprints.GenericClusterProvider({
  version: KubernetesVersion.V1_23,
  fargateProfiles: {
    main: {
      selectors: [
        { namespace: 'kube-system' },
        { namespace: 'karpenter' },
  clusterLogging: [

// EKS Blueprints uses a Builder pattern.
  .clusterProvider(clusterProvider) // start with the Cluster Provider
    // Use the EKS add-ons that manage coredns and the VPC CNI plugin
    new blueprints.addons.CoreDnsAddOn('v1.8.7-eksbuild.3'),
    new blueprints.addons.VpcCniAddOn('v1.12.0-eksbuild.1'),
    // Install Karpenter
    new blueprints.addons.KarpenterAddOn({
      provisionerSpecs: {
        // Karpenter examines scheduled pods for the following labels
        // in their `nodeSelector` or `nodeAffinity` rules and routes
        // the pods to the node with the best fit, provisioning a new
        // node if necessary to meet the requirements.
        // Allow either amd64 or arm64 nodes to be provisioned 
        'kubernetes.io/arch': ['amd64', 'arm64'],
        // Allow either Spot or On-Demand nodes to be provisioned
        'karpenter.sh/capacity-type': ['spot', 'on-demand']
      // Launch instances in the VPC private subnets
      subnetTags: {
        Name: 'gitlab-runner-eks-demo/gitlab-runner-eks-demo-vpc/PrivateSubnet*'
      // Apply security groups that match the following tags to the launched instances
      securityGroupTags: {
        'kubernetes.io/cluster/gitlab-runner-eks-demo': 'owned'      
    // Create a pair of a new GitLab runner deployments, one running on
    // arm64 (Graviton) instance, the other on an x86_64 instance.
    // We'll show the definition of the GitLabRunner class below.
    new GitLabRunner({
      arch: CpuArch.ARM_64,
      // If you're using an on-premise GitLab installation, you'll want
      // to change the URL below.
      gitlabUrl: 'https://gitlab.com',
      // Kubernetes Secret containing the runner registration token
      // (discussed later)
      secretName: 'gitlab-runner-secret'
    new GitLabRunner({
      arch: CpuArch.X86_64,
      gitlabUrl: 'https://gitlab.com',
      secretName: 'gitlab-runner-secret'
         // Stack name
The GitLabRunner class is a HelmAddOn subclass that takes a few parameters from the top-level application:
// The location and name of the GitLab Runner Helm chart
const CHART_REPO = 'https://charts.gitlab.io';
const HELM_CHART = 'gitlab-runner';

// The default namespace for the runner
const DEFAULT_NAMESPACE = 'gitlab';

// The default Helm chart version
const DEFAULT_VERSION = '0.40.1';

export enum CpuArch {
    ARM_64 = 'arm64',
    X86_64 = 'amd64'

// Configuration parameters
interface GitLabRunnerProps {
    // The CPU architecture of the node on which the runner pod will reside
    arch: CpuArch
    // The GitLab API URL 
    gitlabUrl: string
    // Kubernetes Secret containing the runner registration token (discussed later)
    secretName: string
    // Optional tags for the runner. These will be added to the default list 
    // corresponding to the runner's CPU architecture.
    tags?: string[]
    // Optional Kubernetes namespace in which the runner will be installed
    namespace?: string
    // Optional Helm chart version
    chartVersion?: string

export class GitLabRunner extends HelmAddOn {
    private arch: CpuArch;
    private gitlabUrl: string;
    private secretName: string;
    private tags: string[] = [];

    constructor(props: GitLabRunnerProps) {
        // Invoke the superclass (HelmAddOn) constructor
            name: `gitlab-runner-${props.arch}`,
            chart: HELM_CHART,
            repository: CHART_REPO,
            namespace: props.namespace || DEFAULT_NAMESPACE,
            version: props.chartVersion || DEFAULT_VERSION,
            release: `gitlab-runner-${props.arch}`,

        this.arch = props.arch;
        this.gitlabUrl = props.gitlabUrl;
        this.secretName = props.secretName;

        // Set default runner tags
        switch (this.arch) {
            case CpuArch.X86_64:
                this.tags.push('amd64', 'x86', 'x86-64', 'x86_64');
            case CpuArch.ARM_64:
        this.tags.push(...props.tags || []); // Add any custom tags

    // `deploy` method required by the abstract class definition. Our implementation
    // simply installs a Helm chart to the cluster with the proper values.
    deploy(clusterInfo: ClusterInfo): void | Promise<Construct> {
        const chart = this.addHelmChart(clusterInfo, this.getValues(), true);
        return Promise.resolve(chart);

    // Returns the values for the GitLab Runner Helm chart
    private getValues(): Values {
        return {
            gitlabUrl: this.gitlabUrl,
            runners: {
                config: this.runnerConfig(), // runner config.toml file, from below
                name: `demo-runner-${this.arch}`, // name as seen in GitLab UI
                tags: uniq(this.tags).join(','),
                secret: this.secretName, // see below
            // Labels to constrain the nodes where this runner can be placed
            nodeSelector: {
                'kubernetes.io/arch': this.arch,
                'karpenter.sh/capacity-type': 'on-demand'
            // Default pod label
            podLabels: {
                'gitlab-role': 'manager'
            // Create all the necessary RBAC resources including the ServiceAccount
            rbac: {
                create: true
            // Required resources (memory/CPU) for the runner pod. The runner
            // is fairly lightweight as it's a self-contained Golang app.
            resources: {
                requests: {
                    memory: '128Mi',
                    cpu: '256m'

    // This string contains the runner's `config.toml` file including the
    // Kubernetes executor's configuration. Note the nodeSelector constraints 
    // (including the use of Spot capacity and the CPU architecture).
    private runnerConfig(): string {
        return `
      namespace = "{{.Release.Namespace}}"
      image = "ubuntu:16.04"
      "kubernetes.io/arch" = "${this.arch}"
      "kubernetes.io/os" = "linux"
      "karpenter.sh/capacity-type" = "spot"
      gitlab-role = "runner"

For security reasons, we store the GitLab registration token in a Kubernetes Secret – never in our source code. For additional security, we recommend encrypting Secrets using an AWS Key Management Service (AWS KMS) key that you supply by specifying the encryption configuration when you create your Amazon EKS cluster. It’s a good practice to restrict access to this Secret via Kubernetes RBAC rules.

To create the Secret, run the following command:

# These two values must match the parameters supplied to the GitLabRunner constructor
# The value of the registration token.

kubectl -n $NAMESPACE create secret generic $SECRET_NAME \
        --from-literal="runner-registration-token=$TOKEN" \

Building a multi-architecture container image

Now that we’ve launched our GitLab runners and configured the executors, we can build and test a simple multi-architecture container image. If the tests pass, we can then upload it to our project’s GitLab container registry. Our application will be pretty simple: we’ll create a web server in Go that simply prints out “Hello World” and prints out the current architecture.

Find the source code of our sample app in our GitLab repo.

In GitLab, the CI/CD configuration lives in the .gitlab-ci.yml file at the root of the source repository. In this file, we declare a list of ordered build stages, and then we declare the specific jobs associated with each stage.

Our stages are:

  1. The build stage, in which we compile our code, produce our architecture-specific images, and upload these images to the GitLab container registry. These uploaded images are tagged with a suffix indicating the architecture on which they were built. This job uses a matrix variable to run it in parallel against two different runners – one for each supported architecture. Furthermore, rather than using docker build to produce our images, we use Kaniko to build them. This lets us build our images in an unprivileged container environment and improve the security posture considerably.
  2. The test stage, in which we test the code. As with the build stage, we use a matrix variable to run the tests in parallel in separate pods on each supported architecture.

The assembly stage, in which we create a multi-architecture image manifest from the two architecture-specific images. Then, we push the manifest into the image registry so that we can refer to it in future deployments.

Figure 2. Example CI/CD pipeline for multi-architecture images

Figure 2. Example CI/CD pipeline for multi-architecture images.

Here’s what our top-level configuration looks like:

  # These are used by the runner to configure the Kubernetes executor, and define
  # the values of spec.containers[].resources.limits.{memory,cpu} for the Pod(s).

# List of stages for jobs, and their order of execution  
  - build
  - test
  - create-multiarch-manifest
Here’s what our build stage job looks like. Note the matrix of variables which are set in BUILD_ARCH as the two jobs are run in parallel:
  stage: build
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
    name: gcr.io/kaniko-project/executor:debug
    entrypoint: [""]
    - mkdir -p /kaniko/.docker
    # Configure authentication data for Kaniko so it can push to the
    # GitLab container registry
    - echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
    # Build the image and push to the registry. In this stage, we append the build
    # architecture as a tag suffix.
    - >-
      --context "${CI_PROJECT_DIR}"
      --dockerfile "${CI_PROJECT_DIR}/Dockerfile"

Here’s what our test stage job looks like. This time we use the image that we just produced. Our source code is copied into the application container. Then, we can run make test-api to execute the server test suite.

  stage: build
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
    # Use the image we just built
    - make test-container

Finally, here’s what our assembly stage looks like. We use Podman to build the multi-architecture manifest and push it into the image registry. Traditionally we might have used docker buildx to do this, but using Podman lets us do this work in an unprivileged container for additional security.

  stage: create-multiarch-manifest
  tags: [arm64] 
  image: public.ecr.aws/docker/library/fedora:36
    - yum -y install podman
    - echo "${CI_REGISTRY_PASSWORD}" | podman login -u "${CI_REGISTRY_USER}" --password-stdin "${CI_REGISTRY}"
    - podman manifest create ${COMPOSITE_IMAGE}
    - >-
      for arch in arm64 amd64; do
        podman manifest add ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}-${arch};
    - podman manifest inspect ${COMPOSITE_IMAGE}
    # The composite image manifest omits the architecture from the tag suffix.
    - podman manifest push ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}

Trying it out

I’ve created a public test GitLab project containing the sample source code, and attached the runners to the project. We can see them at Settings > CI/CD > Runners:

Figure 3. GitLab runner configurations

Figure 3. GitLab runner configurations.

Here we can also see some pipeline executions, where some have succeeded, and others have failed.

Figure 4. GitLab sample pipeline executions

Figure 4. GitLab sample pipeline executions.

We can also see the specific jobs associated with a pipeline execution:

Figure 5. GitLab sample job executions

Figure 5. GitLab sample job executions.

Finally, here are our container images:

Figure 5. GitLab sample job executions

Figure 6. GitLab sample container registry.


In this post, we’ve illustrated how you can quickly and easily construct multi-architecture container images with GitLab, Amazon EKS, Karpenter, and Amazon EC2, using both x86 and Graviton instance families. We indexed on using as many managed services as possible, maximizing security, and minimizing complexity and TCO. We dove deep on multiple facets of the process, and discussed how to save up to 90% of the solution’s cost by using Spot instances for CI/CD executions.

Find the sample code, including everything shown here today, in our GitLab repository.

Building multi-architecture images will unlock the value and performance of running your applications on AWS Graviton and give you increased flexibility over compute choice. We encourage you to get started today.

About the author:

Michael Fischer

Michael Fischer is a Principal Specialist Solutions Architect at Amazon Web Services. He focuses on helping customers build more cost-effectively and sustainably with AWS Graviton. Michael has an extensive background in systems programming, monitoring, and observability. His hobbies include world travel, diving, and playing the drums.

Multi-branch pipeline management and infrastructure deployment using AWS CDK Pipelines

Post Syndicated from Iris Kraja original https://aws.amazon.com/blogs/devops/multi-branch-pipeline-management-and-infrastructure-deployment-using-aws-cdk-pipelines/

This post describes how to use the AWS CDK Pipelines module to follow a Gitflow development model using AWS Cloud Development Kit (AWS CDK). Software development teams often follow a strict branching strategy during a solutions development lifecycle. Newly-created branches commonly need their own isolated copy of infrastructure resources to develop new features.

CDK Pipelines is a construct library module for continuous delivery of AWS CDK applications. CDK Pipelines are self-updating: if you add application stages or stacks, then the pipeline automatically reconfigures itself to deploy those new stages and/or stacks.

The following solution creates a new AWS CDK Pipeline within a development account for every new branch created in the source repository (AWS CodeCommit). When a branch is deleted, the pipeline and all related resources are also destroyed from the account. This GitFlow model for infrastructure provisioning allows developers to work independently from each other, concurrently, even in the same stack of the application.

Solution overview

The following diagram provides an overview of the solution. There is one default pipeline responsible for deploying resources to the different application environments (e.g., Development, Pre-Prod, and Prod). The code is stored in CodeCommit. When new changes are pushed to the default CodeCommit repository branch, AWS CodePipeline runs the default pipeline. When the default pipeline is deployed, it creates two AWS Lambda functions.

These two Lambda functions are invoked by CodeCommit CloudWatch events when a new branch in the repository is created or deleted. The Create Lambda function uses the boto3 CodeBuild module to create an AWS CodeBuild project that builds the pipeline for the feature branch. This feature pipeline consists of a build stage and an optional update pipeline stage for itself. The Destroy Lambda function creates another CodeBuild project which cleans all of the feature branch’s resources and the feature pipeline.

Figure 1. Architecture diagram.

Figure 1. Architecture diagram.


Before beginning this walkthrough, you should have the following prerequisites:

  • An AWS account
  • AWS CDK installed
  • Python3 installed
  • Jq (JSON processor) installed
  • Basic understanding of continuous integration/continuous development (CI/CD) Pipelines

Initial setup

Download the repository from GitHub:

# Command to clone the repository
git clone https://github.com/aws-samples/multi-branch-cdk-pipelines.git
cd multi-branch-cdk-pipelines

Create a new CodeCommit repository in the AWS Account and region where you want to deploy the pipeline and upload the source code from above to this repository. In the config.ini file, change the repository_name and region variables accordingly.

Make sure that you set up a fresh Python environment. Install the dependencies:

pip install -r requirements.txt

Run the initial-deploy.sh script to bootstrap the development and production environments and to deploy the default pipeline. You’ll be asked to provide the following parameters: (1) Development account ID, (2) Development account AWS profile name, (3) Production account ID, and (4) Production account AWS profile name.

sh ./initial-deploy.sh --dev_account_id <YOUR DEV ACCOUNT ID> --
dev_profile_name <YOUR DEV PROFILE NAME> --prod_account_id <YOUR PRODUCTION

Default pipeline

In the CI/CD pipeline, we set up an if condition to deploy the default branch resources only if the current branch is the default one. The default branch is retrieved programmatically from the CodeCommit repository. We deploy an Amazon Simple Storage Service (Amazon S3) Bucket and two Lambda functions. The bucket is responsible for storing the feature branches’ CodeBuild artifacts. The first Lambda function is triggered when a new branch is created in CodeCommit. The second one is triggered when a branch is deleted.

if branch == default_branch:

    # Artifact bucket for feature AWS CodeBuild projects
    artifact_bucket = Bucket(
    # AWS Lambda function triggered upon branch creation
    create_branch_func = aws_lambda.Function(
        code=aws_lambda.Code.from_asset(path.join(this_dir, 'code')),
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix

    # AWS Lambda function triggered upon branch deletion
    destroy_branch_func = aws_lambda.Function(
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix,
            "DEV_STAGE_NAME": f'{dev_stage_name}-{dev_stage.main_stack_name}'

Then, the CodeCommit repository is configured to trigger these Lambda functions based on two events:

(1) Reference created

# Configure AWS CodeCommit to trigger the Lambda function when a new branch is created
    description="AWS CodeCommit reference created event.",

(2) Reference deleted

# Configure AWS CodeCommit to trigger the Lambda function when a branch is deleted
    description="AWS CodeCommit reference deleted event.",

Lambda functions

The two Lambda functions build and destroy application environments mapped to each feature branch. An Amazon CloudWatch event triggers the LambdaTriggerCreateBranch function whenever a new branch is created. The CodeBuild client from boto3 creates the build phase and deploys the feature pipeline.

Create function

The create function deploys a feature pipeline which consists of a build stage and an optional update pipeline stage for itself. The pipeline downloads the feature branch code from the CodeCommit repository, initiates the Build and Test action using CodeBuild, and securely saves the built artifact on the S3 bucket.

The Lambda function handler code is as follows:

def handler(event, context):
    """Lambda function handler"""

    reference_type = event['detail']['referenceType']

        if reference_type == 'branch':
            branch = event['detail']['referenceName']
            repo_name = event['detail']['repositoryName']

                description="Build project to deploy branch pipeline",
                    'type': 'CODECOMMIT',
                    'location': f'https://git-codecommit.{region}.amazonaws.com/v1/repos/{repo_name}',
                    'buildspec': generate_build_spec(branch)
                    'type': 'S3',
                    'location': artifact_bucket_name,
                    'path': f'{branch}',
                    'packaging': 'NONE',
                    'artifactIdentifier': 'BranchBuildArtifact'
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'

    except Exception as e:

Create branch CodeBuild project’s buildspec.yaml content:

version: 0.2
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
      - npm install -g aws-cdk && pip install -r requirements.txt
      - cdk synth
      - cdk deploy --require-approval=never
    - '**/*'

Destroy function

The second Lambda function is responsible for the destruction of a feature branch’s resources. Upon the deletion of a feature branch, an Amazon CloudWatch event triggers this Lambda function. The function creates a CodeBuild Project which destroys the feature pipeline and all of the associated resources created by that pipeline. The source property of the CodeBuild Project is the feature branch’s source code saved as an artifact in Amazon S3.

The Lambda function handler code is as follows:

def handler(event, context):
    reference_type = event['detail']['referenceType']

        if reference_type == 'branch':
            branch = event['detail']['referenceName']
                description="Build project to destroy branch resources",
                    'type': 'S3',
                    'location': f'{artifact_bucket_name}/{branch}/CodeBuild-{branch}-create/',
                    'buildspec': generate_build_spec(branch)
                    'type': 'NO_ARTIFACTS'
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'



    except Exception as e:

Destroy the branch CodeBuild project’s buildspec.yaml content:

version: 0.2
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
      - npm install -g aws-cdk && pip install -r requirements.txt
      - cdk destroy cdk-pipelines-multi-branch-{branch} --force
      - aws cloudformation delete-stack --stack-name {dev_stage_name}-{branch}
      - aws s3 rm s3://{artifact_bucket_name}/{branch} --recursive

Create a feature branch

On your machine’s local copy of the repository, create a new feature branch using the following git commands. Replace user-feature-123 with a unique name for your feature branch. Note that this feature branch name must comply with the CodePipeline naming restrictions, as it will be used to name a unique pipeline later in this walkthrough.

# Create the feature branch
git checkout -b user-feature-123
git push origin user-feature-123

The first Lambda function will deploy the CodeBuild project, which then deploys the feature pipeline. This can take a few minutes. You can log in to the AWS Console and see the CodeBuild project running under CodeBuild.

Figure 2. AWS Console - CodeBuild projects.

Figure 2. AWS Console – CodeBuild projects.

After the build is successfully finished, you can see the deployed feature pipeline under CodePipelines.

Figure 3. AWS Console - CodePipeline pipelines.

Figure 3. AWS Console – CodePipeline pipelines.

The Lambda S3 trigger project from AWS CDK Samples is used as the infrastructure resources to demonstrate this solution. The content is placed inside the src directory and is deployed by the pipeline. When visiting the Lambda console page, you can see two functions: one by the default pipeline and one by our feature pipeline.

Figure 4. AWS Console - Lambda functions.

Figure 4. AWS Console – Lambda functions.

Destroy a feature branch

There are two common ways for removing feature branches. The first one is related to a pull request, also known as a “PR”. This occurs when merging a feature branch back into the default branch. Once it’s merged, the feature branch will be automatically closed. The second way is to delete the feature branch explicitly by running the following git commands:

# delete branch local
git branch -d user-feature-123

# delete branch remote
git push origin --delete user-feature-123

The CodeBuild project responsible for destroying the feature resources is now triggered. You can see the project’s logs while the resources are being destroyed in CodeBuild, under Build history.

Figure 5. AWS Console - CodeBuild projects.

Figure 5. AWS Console – CodeBuild projects.

Cleaning up

To avoid incurring future charges, log into the AWS console of the different accounts you used, go to the AWS CloudFormation console of the Region(s) where you chose to deploy, and select and click Delete on the main and branch stacks.


This post showed how you can work with an event-driven strategy and AWS CDK to implement a multi-branch pipeline flow using AWS CDK Pipelines. The described solutions leverage Lambda and CodeBuild to provide a dynamic orchestration of resources for multiple branches and pipelines.
For more information on CDK Pipelines and all the ways it can be used, see the CDK Pipelines reference documentation.

About the authors:

Iris Kraja

Iris is a Cloud Application Architect at AWS Professional Services based in New York City. She is passionate about helping customers design and build modern AWS cloud native solutions, with a keen interest in serverless technology, event-driven architectures and DevOps.  Outside of work, she enjoys hiking and spending as much time as possible in nature.

Jan Bauer

Jan is a Cloud Application Architect at AWS Professional Services. His interests are serverless computing, machine learning, and everything that involves cloud computing.

Rolando Santamaria Maso

Rolando is a senior cloud application development consultant at AWS Professional Services, based in Germany. He helps customers migrate and modernize workloads in the AWS Cloud, with a special focus on modern application architectures and development best practices, but he also creates IaC using AWS CDK. Outside work, he maintains open-source projects and enjoys spending time with family and friends.

Caroline Gluck

Caroline is an AWS Cloud application architect based in New York City, where she helps customers design and build cloud native data science applications. Caroline is a builder at heart, with a passion for serverless architecture and machine learning. In her spare time, she enjoys traveling, cooking, and spending time with family and friends.

Migrate a large data warehouse from Greenplum to Amazon Redshift using AWS SCT – Part 3

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/part-3-migrate-a-large-data-warehouse-from-greenplum-to-amazon-redshift-using-aws-sct/

In this third post of a multi-part series, we explore some of the edge cases in migrating a large data warehouse from Greenplum to Amazon Redshift using AWS Schema Conversion Tool (AWS SCT) and how to handle these challenges. Challenges include how best to use virtual partitioning, edge cases for numeric and character fields, and arrays.

You can check out the first post of this series for guidance on planning, running, and validating the migration. You can also check out the second post for best practices for choosing the optimal Amazon Redshift cluster, data architecture, converting stored procedures, compatible functions and queries widely used for SQL conversions, and recommendations for optimizing the length of data types for table columns.

Unbounded character data type

Greenplum supports creating columns as text and varchar without specifying the length of the field. This works without an issue in Greenplum but doesn’t work well in migrating to Amazon Redshift. Amazon Redshift stores data in columnar format and gets better compression when using shorter column lengths. Therefore, the Amazon Redshift best practice is to use the smallest character length possible.

AWS SCT will convert these unbounded fields as large objects (LOBs) instead of treating the columns as character fields with a specified length. LOBs are implemented differently in each database product on the market, but in general, a LOB is not stored with the rest of the table data. Instead, there is a pointer to the location of the data. When the LOB is queried, the database reconstitutes the data automatically for you, but this typically requires more resources.

Amazon Redshift doesn’t support LOBs, so AWS SCT resolves this by loading the data into Amazon Simple Storage Service (Amazon S3) and in the column, it stores the S3 URL. When you need to retrieve this data, you have to query the table, get the S3 URL, and then fetch the data from Amazon S3. This isn’t ideal because most of the time, the actual maximum length of the field doesn’t require it to be treated as a LOB, and storing the data remotely means it will take much longer to fetch the data for queries.

The current resolution is to calculate the maximum length of these columns and update the Greenplum tables before converting to Amazon Redshift with AWS SCT.

Note that in a future release of AWS SCT, the collection of statistics will include calculating the maximum length for each column, and the conversion of unbounded varchar and text will set the length in Amazon Redshift automatically.

The following code is an example of an unbounded character data type:

CREATE TABLE public.mytable 
(description1 text NOT NULL PRIMARY KEY, description2 varchar);
CREATE INDEX ON public.mytable (description2);

This table uses a primary key column on an unbounded text column. This needs to be converted to varchar(n), where n is the maximum length found in this column.

  1. Drop unique constraints on affected columns:
    ALTER TABLE public.mytable DROP CONSTRAINT mytable_pkey;

  2. Drop indexes on affected columns:
    DROP INDEX public.mytable_description2_idx;

  3. Calculate maximum length of affected columns:
    select coalesce(max(length(description1)), 10),
    coalesce(max(length(description2)), 10) 
    from public.mytable;

Note that in this example, the description1 and description2 columns only contain NULL values, or the table doesn’t have any data in it, or the calculated length of the columns is 10.

  1. Alter the length of the affected columns:
    ALTER TABLE public.mytable ALTER COLUMN description1 TYPE varchar(10);
    ALTER TABLE public.mytable ALTER COLUMN description2 TYPE varchar(10);

You can now proceed with using AWS SCT to convert the Greenplum schema to Amazon Redshift and avoiding using LOBs to store the column values.

GitHub help

If you have many tables to update and want an automated solution, you can use the add_varchar_lengths.sh script found in the GitHub repo to fix all of the unbounded varchar and text columns in a given schema in Greenplum. The script calculates the appropriate maximum length and then alters the Greenplum tables so the varchar data type is bounded by a length.

Please note that the script also will drop any constraints or indexes on the affected columns.

Empty character data

Greenplum and Amazon Redshift support an empty string value in a field that is different from NULL. The behavior is the same between the two databases. However, AWS SCT defaults to convert empty strings to NULL. This simply needs to be disabled to avoid problems.

  1. In AWS SCT, open your project, choose Settings, Project settings, and Data migration.
  2. Scroll to the bottom and find Use empty as null value.
  3. Deselect this so that AWS SCT doesn’t convert empty strings to NULL.

NaN and Infinity numeric data type

Greenplum supports NaN and Infinity in a numeric field to represent an undefined calculation result and infinity. NaN is very uncommon because when using aggregate functions on a column with a NaN row, the result will also be NaN. Infinity is also uncommon and not useful when aggregating data. However, you may encounter these values in a Greenplum database.

Amazon Redshift doesn’t support NaN and Infinity, and AWS SCT doesn’t check for this in your data. If you do encounter this when using AWS SCT, the task will fail with a numeric conversion error.

To resolve this, it’s suggested to use NULL instead of NaN and Infinity. This allows you to aggregate data and get results other than NaN and, importantly, allow you to convert the Greenplum data to Amazon Redshift.

The following code is an example NaN numeric value:

CREATE TABLE public.mytable2 (id int NOT NULL, amount numeric NOT NULL);
INSERT INTO public.mytable2 VALUES (1, 10), (2, 'NaN'), (3, 20);
  1. Drop the NOT NULL constraint:
    ALTER TABLE public.mytable2 ALTER COLUMN amount DROP NOT NULL;

  2. Update the table:
    UPDATE public.mytable2 SET amount = NULL where amount = 'NaN';

You can now proceed with using AWS SCT to migrate the Greenplum data to Amazon Redshift.

Note that in a future release of AWS SCT, there will be an option to convert NaN and Infinity to NULL so that you won’t have to update your Greenplum data to migrate to Amazon Redshift.

Virtual partitioning on GP_SEGMENT_ID

For large tables, it’s recommended to use virtual partitioning to extract data from Greenplum. Without virtual partitioning, AWS SCT will run a single query to unload data from Greenplum. For example:

SELECT * FROM store_sales;

If this table is very large, it will take a long time to extract the data because this is a single process querying the data. With virtual partitioning, multiple queries are run in parallel so that the extraction of data is completed faster. It also makes it easier to recover if there is an issue with the task.

Virtual partitioning is very flexible, but a simple way to do this in Amazon Redshift is to utilize the Greenplum hidden column gp_segment_id. This column identifies which segment in Greenplum has the data, and each segment should have an equal number of rows. Therefore, creating partitions for each gp_segment_id is an easy way to implement virtual partitioning.

If you’re not familiar with the term segment, it’s similar to an Amazon Redshift slice.

For example:

SELECT * FROM store_sales WHERE gp_segment_id = 0;
SELECT * FROM store_sales WHERE gp_segment_id = 1;
SELECT * FROM store_sales WHERE gp_segment_id = 2;
  1. First, determine the number of segments in Greenplum:
    SELECT count(*) 
    FROM gp_segment_configuration 
    WHERE content >= 0 AND preferred_role = 'p';

Now you can configure AWS SCT.

  1. In AWS SCT, go to Data Migration view (other) and choose (right-click) a large table.
  2. Scroll down to Add virtual partitioning.
  3. For the partition type, choose Auto Split and change the column name to GP_SEGMENT_ID.
  4. Use 0 for Start value, the number of segments found in Step 1 as End value, and Interval of 1.

When you create a local task to load this table, the task will have a sub-task for each gp_segment_id value.

Note that in a future release of AWS SCT, there will be an option to automatically virtually partition tables based on GP_SEGMENT_ID. This option will also retrieve the number of segments automatically.


Greenplum supports arrays such as bigint[] that are unbounded. Typically, arrays are kept relatively small in Greenplum because arrays consume more memory in Greenplum than using an alternative strategy. However, it’s possible to have a very large array in Greenplum that isn’t supported by Amazon Redshift.

AWS SCT converts a Greenplum array to varchar(65535), but if the converted array is longer than 65,535 characters, then the load will fail.

The following code is an example of a large array:

CREATE TABLE public.sales 
(sales_id int NOT NULL,
customer_id int NOT NULL,
sales_item_ids bigint[]) 
DISTRIBUTED BY (sales_id);

INSERT INTO public.sales values (1, 100, '{1,2,3}'), (2, 100, '{1,2,3}');

In this example, the sales items are stored in an array for each sales_id. If you encounter an error while loading that the length is too long to load this data into Amazon Redshift with AWS SCT, then this is the solution. It’s also a more efficient pattern to store data in both Greenplum and Amazon Redshift!

  1. Create a new sales table that has all columns from the existing sales table, but exclude the array column:
    CREATE TABLE public.sales_new 
    (sales_id int NOT NULL,
    customer_id int NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. Populate the new sales table with the existing data except for the array column:
    INSERT INTO public.sales_new (sales_id, customer_id) 
    SELECT sales_id, customer_id FROM public.sales;

We create a new table that is a cross-reference of sales IDs with the sales items. Instead of having a single row for this association, now there will be a row for each relationship.

  1. Create a new sales item table:
    CREATE TABLE public.sales_items 
    (sales_id int NOT NULL, 
    sales_item_id bigint NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. To unnest the array, create a row for each array element:
    INSERT INTO public.sales_items 
    (sales_id, sales_item_id) 
    SELECT sales_id, unnest(sales_item_ids) 
    FROM public.sales;

  3. Rename the sales tables:
    ALTER TABLE public.sales RENAME TO sales_old;
    ALTER TABLE public.sales_new RENAME TO sales;

In AWS SCT, refresh the tables and migrate the revised sales and the new sales_items table.

The following are some example queries before and after.


SELECT s.sales_id, unnest(s.sales_item_ids) 
FROM public.sales s 
WHERE s.sales_id = 1;


SELECT s.sales_id, i.sales_item_id 
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.sales_id = 1;


SELECT s.sales_id 
FROM public.sales s 
WHERE s.customer_id = 100
AND 10 = ANY(s.sales_item_ids);


SELECT s.sales_id
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.customer_id = 100
AND i.sales_item_id = 10;


Greenplum, like Amazon Redshift, supports the VACUUM command, which reclaims storage space after UPDATE and DELETE commands are run on a table. Greenplum also allows you to add the ANALYZE option to run both statements with a single command.

The following code is the Greenplum command:


This is not very common, but you’ll see this from time to time. If you’re just inserting data into a table, there is no need to run VACUUM, but for ease of use, sometimes developers will use VACUUM ANALYZE.

The following are the Amazon Redshift commands:

VACUUM table;
ANALYZE table;

Amazon Redshift doesn’t support adding ANALYZE to the VACUUM command, so instead, this needs to be two different statements. Also note that Amazon Redshift performs VACUUM and ANALYZE automatically for you so in most cases, you can remove these commands from your scripts entirely.


Greenplum supports an unusual shortcut for eliminating duplicates in a table. This feature keeps the first row for each set of rows based on the order of the data being fetched. It’s easiest to understand by looking at an example:

CREATE TABLE customer 
(customer_name varchar(100) not null, 
customer_address varchar(1000) not null, 
lastupdate timestamp not null);

('ACME', '123 Main St', '2022-01-01'), 
('ACME', '456 Market St', '2022-05-01'), 
('ACME', '789 Broadway', '2022-08-01');

SELECT DISTINCT ON (customer_name) customer_name, customer_address 
FROM customer 
ORDER BY customer_name, lastupdate DESC;

We get the following results:

customer_name | customer_address 
 ACME          | 789 Broadway

The solution for running this in Amazon Redshift is to use the ANSI standard row_number() analytical function, as shown in the following code:

SELECT sub.customer_name, sub.customer_address 
FROM (SELECT customer_name, customer_address, row_number() over (partition by customer_name ORDER BY lastupdate DESC) AS row_number FROM customer) AS sub 
WHERE sub.row_number = 1;

Clean up

The examples in this post create tables in Greenplum. To remove these example tables, run the following commands:

DROP TABLE IF EXISTS public.mytable;
DROP TABLE IF EXISTS public.mytable2;
DROP TABLE IF EXISTS public.sales;
DROP TABLE IF EXISTS public.sales_new;
DROP TABLE IF EXISTS public.sales_items;
DROP TABLE IF EXISTS public.customer;


In this post, we covered some of the edge cases when migrating Greenplum to Amazon Redshift and how to handle these challenges, including easy virtual partitioning, edge cases for numeric and character fields, and arrays. This is not an exhaustive list of migrating Greenplum to Amazon Redshift, but this series should help you navigate modernizing your data platform by moving to Amazon Redshift.

For additional details, see the Amazon Redshift Getting Started Guide and the AWS SCT User Guide.

About the Authors

Jon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Nelly Susanto is a Senior Database Migration Specialist of AWS Database Migration Accelerator. She has over 10 years of technical experience focusing on migrating and replicating databases along with data warehouse workloads. She is passionate about helping customers in their cloud journey.

Suresh Patnam is a Principal BDM – GTM AI/ML Leader at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible by leveraging Data & AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Configuration driven dynamic multi-account CI/CD solution on AWS

Post Syndicated from Anshul Saxena original https://aws.amazon.com/blogs/devops/configuration-driven-dynamic-multi-account-ci-cd-solution-on-aws/

Many organizations require durable automated code delivery for their applications. They leverage multi-account continuous integration/continuous deployment (CI/CD) pipelines to deploy code and run automated tests in multiple environments before deploying to Production. In cases where the testing strategy is release specific, you must update the pipeline before every release. Traditional pipeline stages are predefined and static in nature, and once the pipeline stages are defined it’s hard to update them. In this post, we present a configuration driven dynamic CI/CD solution per repository. The pipeline state is maintained and governed by configurations stored in Amazon DynamoDB. This gives you the advantage of automatically customizing the pipeline for every release based on the testing requirements.

By following this post, you will set up a dynamic multi-account CI/CD solution. Your pipeline will deploy and test a sample pet store API application. Refer to Automating your API testing with AWS CodeBuild, AWS CodePipeline, and Postman for more details on this application. New code deployments will be delivered with custom pipeline stages based on the pipeline configuration that you create. This solution uses services such as AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, Amazon DynamoDB, AWS Lambda, and AWS Step Functions.

Solution overview

The following diagram illustrates the solution architecture:

The image represents the solution workflow, highlighting the integration of the AWS components involved.

Figure 1: Architecture Diagram

  1. Users insert/update/delete entry in the DynamoDB table.
  2. The Step Function Trigger Lambda is invoked on all modifications.
  3. The Step Function Trigger Lambda evaluates the incoming event and does the following:
    1. On insert and update, triggers the Step Function.
    2. On delete, finds the appropriate CloudFormation stack and deletes it.
  4. Steps in the Step Function are as follows:
    1. Collect Information (Pass State) – Filters the relevant information from the event, such as repositoryName and referenceName.
    2. Get Mapping Information (Backed by CodeCommit event filter Lambda) – Retrieves the mapping information from the Pipeline config stored in the DynamoDB.
    3. Deployment Configuration Exist? (Choice State) – If the StatusCode == 200, then the DynamoDB entry is found, and Initiate CloudFormation Stack step is invoked, or else StepFunction exits with Successful.
    4. Initiate CloudFormation Stack (Backed by stack create Lambda) – Constructs the CloudFormation parameters and creates/updates the dynamic pipeline based on the configuration stored in the DynamoDB via CloudFormation.

Code deliverables

The code deliverables include the following:

  1. AWS CDK app – The AWS CDK app contains the code for all the Lambdas, Step Functions, and CloudFormation templates.
  2. sample-application-repo – This directory contains the sample application repository used for deployment.
  3. automated-tests-repo– This directory contains the sample automated tests repository for testing the sample repo.

Deploying the CI/CD solution

  1. Clone this repository to your local machine.
  2. Follow the README to deploy the solution to your main CI/CD account. Upon successful deployment, the following resources should be created in the CI/CD account:
    1. A DynamoDB table
    2. Step Function
    3. Lambda Functions
  3. Navigate to the Amazon Simple Storage Service (Amazon S3) console in your main CI/CD account and search for a bucket with the name: cloudformation-template-bucket-<AWS_ACCOUNT_ID>. You should see two CloudFormation templates (templates/codepipeline.yaml and templates/childaccount.yaml) uploaded to this bucket.
  4. Run the childaccount.yaml in every target CI/CD account (Alpha, Beta, Gamma, and Prod) by going to the CloudFormation Console. Provide the main CI/CD account number as the “CentralAwsAccountId” parameter, and execute.
  5. Upon successful creation of Stack, two roles will be created in the Child Accounts:
    1. ChildAccountFormationRole
    2. ChildAccountDeployerRole

Pipeline configuration

Make an entry into devops-pipeline-table-info for the Repository name and branch combination. A sample entry can be found in sample-entry.json.

The pipeline is highly configurable, and everything can be configured through the DynamoDB entry.

The following are the top-level keys:

RepoName: Name of the repository for which AWS CodePipeline is configured.
RepoTag: Name of the branch used in CodePipeline.
BuildImage: Build image used for application AWS CodeBuild project.
BuildSpecFile: Buildspec file used in the application CodeBuild project.
DeploymentConfigurations: This key holds the deployment configurations for the pipeline. Under this key are the environment specific configurations. In our case, we’ve named our environments Alpha, Beta, Gamma, and Prod. You can configure to any name you like, but make sure that the entries in json are the same as in the codepipeline.yaml CloudFormation template. This is because there is a 1:1 mapping between them. Sub-level keys under DeploymentConfigurations are as follows:

  • EnvironmentName. This is the top-level key for environment specific configuration. In our case, it’s Alpha, Beta, Gamma, and Prod. Sub level keys under this are:
    • <Env>AwsAccountId: AWS account ID of the target environment.
    • Deploy<Env>: A key specifying whether or not the artifact should be deployed to this environment. Based on its value, the CodePipeline will have a deployment stage to this environment.
    • ManualApproval<Env>: Key representing whether or not manual approval is required before deployment. Enter your email or set to false.
    • Tests: Once again, this is a top-level key with sub-level keys. This key holds the test related information to be run on specific environments. Each test based on whether or not it will be run will add an additional step to the CodePipeline. The tests’ related information is also configurable with the ability to specify the test repository, branch name, buildspec file, and build image for testing the CodeBuild project.


  1. Make an entry into the devops-pipeline-table-info DynamoDB table in the main CI/CD account. A sample entry can be found in sample-entry.json. Make sure to replace the configuration values with appropriate values for your environment. An explanation of the values can be found in the Pipeline Configuration section above.
  2. After the entry is made in the DynamoDB table, you should see a CloudFormation stack being created. This CloudFormation stack will deploy the CodePipeline in the main CI/CD account by reading and using the entry in the DynamoDB table.

Customize the solution for different combinations such as deploying to an environment while skipping for others by updating the pipeline configurations stored in the devops-pipeline-table-info DynamoDB table. The following is the pipeline configured for the sample-application repository’s main branch.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

Figure 2: Dynamic Multi-Account CI/CD Pipeline

Clean up your dynamic multi-account CI/CD solution and related resources

To avoid ongoing charges for the resources that you created following this post, you should delete the following:

  1. The pipeline configuration stored in the DynamoDB
  2. The CloudFormation stacks deployed in the target CI/CD accounts
  3. The AWS CDK app deployed in the main CI/CD account
  4. Empty and delete the retained S3 buckets.


This configuration-driven CI/CD solution provides the ability to dynamically create and configure your pipelines in DynamoDB. IDEMIA, a global leader in identity technologies, adopted this approach for deploying their microservices based application across environments. This solution created by AWS Professional Services allowed them to dynamically create and configure their pipelines per repository per release. As Kunal Bajaj, Tech Lead of IDEMIA, states, “We worked with AWS pro-serve team to create a dynamic CI/CD solution using lambdas, step functions, SQS, and other native AWS services to conduct cross-account deployments to our different environments while providing us the flexibility to add tests and approvals as needed by the business.”

About the authors:

Anshul Saxena

Anshul is a Cloud Application Architect at AWS Professional Services and works with customers helping them in their cloud adoption journey. His expertise lies in DevOps, serverless architectures, and architecting and implementing cloud native solutions aligning with best practices.

Libin Roy

Libin is a Cloud Infrastructure Architect at AWS Professional Services. He enjoys working with customers to design and build cloud native solutions to accelerate their cloud journey. Outside of work, he enjoys traveling, cooking, playing sports and weight training.

How to secure your SaaS tenant data in DynamoDB with ABAC and client-side encryption

Post Syndicated from Jani Muuriaisniemi original https://aws.amazon.com/blogs/security/how-to-secure-your-saas-tenant-data-in-dynamodb-with-abac-and-client-side-encryption/

If you’re a SaaS vendor, you may need to store and process personal and sensitive data for large numbers of customers across different geographies. When processing sensitive data at scale, you have an increased responsibility to secure this data end-to-end. Client-side encryption of data, such as your customers’ contact information, provides an additional mechanism that can help you protect your customers and earn their trust.

In this blog post, we show how to implement client-side encryption of your SaaS application’s tenant data in Amazon DynamoDB with the Amazon DynamoDB Encryption Client. This is accomplished by leveraging AWS Identity and Access Management (IAM) together with AWS Key Management Service (AWS KMS) for a more secure and cost-effective isolation of the client-side encrypted data in DynamoDB, both at run-time and at rest.

Encrypting data in Amazon DynamoDB

Amazon DynamoDB supports data encryption at rest using encryption keys stored in AWS KMS. This functionality helps reduce operational burden and complexity involved in protecting sensitive data. In this post, you’ll learn about the benefits of adding client-side encryption to achieve end-to-end encryption in transit and at rest for your data, from its source to storage in DynamoDB. Client-side encryption helps ensure that your plaintext data isn’t available to any third party, including AWS.

You can use the Amazon DynamoDB Encryption Client to implement client-side encryption with DynamoDB. In the solution in this post, client-side encryption refers to the cryptographic operations that are performed on the application-side in the application’s Lambda function, before the data is sent to or retrieved from DynamoDB. The solution in this post uses the DynamoDB Encryption Client with the Direct KMS Materials Provider so that your data is encrypted by using AWS KMS. However, the underlying concept of the solution is not limited to the use of the DynamoDB Encryption Client, you can apply it to any client-side use of AWS KMS, for example using the AWS Encryption SDK.

For detailed information about using the DynamoDB Encryption Client, see the blog post How to encrypt and sign DynamoDB data in your application. This is a great place to start if you are not yet familiar with DynamoDB Encryption Client. If you are unsure about whether you should use client-side encryption, see Client-side and server-side encryption in the Amazon DynamoDB Encryption Client Developer Guide to help you with the decision.

AWS KMS encryption context

AWS KMS gives you the ability to add an additional layer of authentication for your AWS KMS API decrypt operations by using encryption context. The encryption context is one or more key-value pairs of additional data that you want associated with AWS KMS protected information.

Encryption context helps you defend against the risks of ciphertexts being tampered with, modified, or replaced — whether intentionally or unintentionally. Encryption context helps defend against both an unauthorized user replacing one ciphertext with another, as well as problems like operational events. To use encryption context, you specify associated key-value pairs on encrypt. You must provide the exact same key-value pairs in the encryption context on decrypt, or the operation will fail. Encryption context is not secret, and is not an access-control mechanism. The encryption context is a means of authenticating the data, not the caller.

The Direct KMS Materials Provider used in this blog post transparently generates a unique data key by using AWS KMS for each item stored in the DynamoDB table. It automatically sets the item’s partition key and sort key (if any) as AWS KMS encryption context key-value pairs.

The solution in this blog post relies on the partition key of each table item being defined in the encryption context. If you encrypt data with your own implementation, make sure to add your tenant ID to the encryption context in all your AWS KMS API calls.

For more information about the concept of AWS KMS encryption context, see the blog post How to Protect the Integrity of Your Encrypted Data by Using AWS Key Management Service and EncryptionContext. You can also see another example in Exercise 3 of the Busy Engineer’s Document Bucket Workshop.

Attribute-based access control for AWS

Attribute-based access control (ABAC) is an authorization strategy that defines permissions based on attributes. In AWS, these attributes are called tags. In the solution in this post, ABAC helps you create tenant-isolated access policies for your application, without the need to provision tenant specific AWS IAM roles.

If you are new to ABAC, or need a refresher on the concepts and the different isolation methods, see the blog post How to implement SaaS tenant isolation with ABAC and AWS IAM.

Solution overview

If you are a SaaS vendor expecting large numbers of tenants, it is important that your underlying architecture can cost effectively scale with minimal complexity to support the required number of tenants, without compromising on security. One way to meet these criteria is to store your tenant data in a single pooled DynamoDB table, and to encrypt the data using a single AWS KMS key.

Using a single shared KMS key to read and write encrypted data in DynamoDB for multiple tenants reduces your per-tenant costs. This may be especially relevant to manage your costs if you have users on your organization’s free tier, with no direct revenue to offset your costs.

When you use shared resources such as a single pooled DynamoDB table encrypted by using a single KMS key, you need a mechanism to help prevent cross-tenant access to the sensitive data. This is where you can use ABAC for AWS. By using ABAC, you can build an application with strong tenant isolation capabilities, while still using shared and pooled underlying resources for storing your sensitive tenant data.

You can find the solution described in this blog post in the aws-dynamodb-encrypt-with-abac GitHub repository. This solution uses ABAC combined with KMS encryption context to provide isolation of tenant data, both at rest and at run time. By using a single KMS key, the application encrypts tenant data on the client-side, and stores it in a pooled DynamoDB table, which is partitioned by a tenant ID.

Solution Architecture

Figure 1: Components of solution architecture

Figure 1: Components of solution architecture

The presented solution implements an API with a single AWS Lambda function behind an Amazon API Gateway, and implements processing for two types of requests:

  1. GET request: fetch any key-value pairs stored in the tenant data store for the given tenant ID.
  2. POST request: store the provided key-value pairs in the tenant data store for the given tenant ID, overwriting any existing data for the same tenant ID.

The application is written in Python, it uses AWS Lambda Powertools for Python, and you deploy it by using the AWS CDK.

It also uses the DynamoDB Encryption Client for Python, which includes several helper classes that mirror the AWS SDK for Python (Boto3) classes for DynamoDB. This solution uses the EncryptedResource helper class which provides Boto3 compatible get_item and put_item methods. The helper class is used together with the KMS Materials Provider to handle encryption and decryption with AWS KMS transparently for the application.

Note: This example solution provides no authentication of the caller identity. See chapter “Considerations for authentication and authorization” for further guidance.

How it works

Figure 2: Detailed architecture for storing new or updated tenant data

Figure 2: Detailed architecture for storing new or updated tenant data

As requests are made into the application’s API, they are routed by API Gateway to the application’s Lambda function (1). The Lambda function begins to run with the IAM permissions that its IAM execution role (DefaultExecutionRole) has been granted. These permissions do not grant any access to the DynamoDB table or the KMS key. In order to access these resources, the Lambda function first needs to assume the ResourceAccessRole, which does have the necessary permissions. To implement ABAC more securely in this use case, it is important that the application maintains clear separation of IAM permissions between the assumed ResourceAccessRole and the DefaultExecutionRole.

As the application assumes the ResourceAccessRole using the AssumeRole API call (2), it also sets a TenantID session tag. Session tags are key-value pairs that can be passed when you assume an IAM role in AWS Simple Token Service (AWS STS), and are a fundamental core building block of ABAC on AWS. When the session credentials (3) are used to make a subsequent request, the request context includes the aws:PrincipalTag context key, which can be used to access the session’s tags. The chapter “The ResourceAccessRole policy” describes how the aws:PrincipalTag context key is used in IAM policy condition statements to implement ABAC for this solution. Note that for demonstration purposes, this solution receives the value for the TenantID tag directly from the request URL, and it is not authenticated.

The trust policy of the ResourceAccessRole defines the principals that are allowed to assume the role, and to tag the assumed role session. Make sure to limit the principals to the least needed for your application to function. In this solution, the application Lambda function is the only trusted principal defined in the trust policy.

Next, the Lambda function prepares to encrypt or decrypt the data (4). To do so, it uses the DynamoDB Encryption Client. The KMS Materials Provider and the EncryptedResource helper class are both initialized with sessions by using the temporary credentials from the AssumeRole API call. This allows the Lambda function to access the KMS key and DynamoDB table resources, with access restricted to operations on data belonging only to the specific tenant ID.

Finally, using the EncryptedResource helper class provided by the DynamoDB Encryption Library, the data is written to and read from the DynamoDB table (5).

Considerations for authentication and authorization

The solution in this blog post intentionally does not implement authentication or authorization of the client requests. Instead, the requested tenant ID from the request URL is passed as the tenant identity. Your own applications should always authenticate and authorize tenant requests. There are multiple ways you can achieve this.

Modern web applications commonly use OpenID Connect (OIDC) for authentication, and OAuth for authorization. JSON Web Tokens (JWTs) can be used to pass the resulting authorization data from client to the application. You can validate a JWT when using AWS API Gateway with one of the following methods:

  1. When using a REST or a HTTP API, you can use a Lambda authorizer
  2. When using a HTTP API, you can use a JWT authorizer
  3. You can validate the token directly in your application code

If you write your own authorizer code, you can pick a popular open source library or you can choose the AWS provided open source library. To learn more about using a JWT authorizer, see the blog post How to secure API Gateway HTTP endpoints with JWT authorizer.

Regardless of the chosen method, you must be able to map a suitable claim from the user’s JWT, such as the subject, to the tenant ID, so that it can be used as the session tag in this solution.

The ResourceAccessRole policy

A critical part of the correct operation of ABAC in this solution is with the definition of the IAM access policy for the ResourceAccessRole. In the following policy, be sure to replace <region>, <account-id>, <table-name>, and <key-id> with your own values.

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": [
            "Condition": {
                "ForAllValues:StringEquals": {
                    "dynamodb:LeadingKeys": [
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:kms:<region>:<account-id>:key/<key-id>",
            "Condition": {
                "StringEquals": {
                    "kms:EncryptionContext:tenant_id": "${aws:PrincipalTag/TenantID}"

The policy defines two access statements, both of which apply separate ABAC conditions:

  1. The first statement grants access to the DynamoDB table with the condition that the partition key of the item matches the TenantID session tag in the caller’s session.
  2. The second statement grants access to the KMS key with the condition that one of the key-value pairs in the encryption context of the API call has a key called tenant_id with a value that matches the TenantID session tag in the caller’s session.

Warning: Do not use a ForAnyValue or ForAllValues set operator with the kms:EncryptionContext single-valued condition key. These set operators can create a policy condition that does not require values you intend to require, and allows values you intend to forbid.

Deploying and testing the solution


To deploy and test the solution, you need the following:

Deploying the solution

After you have the prerequisites installed, run the following steps in a command line environment to deploy the solution. Make sure that your AWS CLI is configured with your AWS account credentials. Note that standard AWS service charges apply to this solution. For more information about pricing, see the AWS Pricing page.

To deploy the solution into your AWS account

  1. Use the following command to download the source code:
    git clone https://github.com/aws-samples/aws-dynamodb-encrypt-with-abac
    cd aws-dynamodb-encrypt-with-abac

  2. (Optional) You will need an AWS CDK version compatible with the application (2.37.0) to deploy. The simplest way is to install a local copy with npm, but you can also use a globally installed version if you already have one. To install locally, use the following command to use npm to install the AWS CDK:
    npm install [email protected]

  3. Use the following commands to initialize a Python virtual environment:
    python3 -m venv demoenv
    source demoenv/bin/activate
    python3 -m pip install -r requirements.txt

  4. (Optional) If you have not used AWS CDK with this account and Region before, you first need to bootstrap the environment:
    npx cdk bootstrap

  5. Use the following command to deploy the application with the AWS CDK:
    npx cdk deploy

  6. Make note of the API endpoint URL https://<api url>/prod/ in the Outputs section of the CDK command. You will need this URL for the next steps.
    DemoappStack.ApiEndpoint4F160690 = https://<api url>/prod/

Testing the solution with example API calls

With the application deployed, you can test the solution by making API calls against the API URL that you captured from the deployment output. You can start with a simple HTTP POST request to insert data for a tenant. The API expects a JSON string as the data to store, so make sure to post properly formatted JSON in the body of the request.

An example request using curl -command looks like:

curl https://<api url>/prod/tenant/<tenant-name> -X POST --data '{"email":"<[email protected]>"}'

You can then read the same data back with an HTTP GET request:

curl https://<api url>/prod/tenant/<tenant-name>

You can store and retrieve data for any number of tenants, and can store as many attributes as you like. Each time you store data for a tenant, any previously stored data is overwritten.

Additional considerations

A tenant ID is used as the DynamoDB table’s partition key in the example application in this solution. You can replace the tenant ID with another unique partition key, such as a product ID, as long as the ID is consistently used in the IAM access policy, the IAM session tag, and the KMS encryption context. In addition, while this solution does not use a sort key in the table, you can modify the application to support a sort key with only a few changes. For more information, see Working with tables and data in DynamoDB.

Clean up

To clean up the application resources that you deployed while testing the solution, in the solution’s home directory, run the command cdk destroy.

Then, if you no longer plan to deploy to this account and Region using AWS CDK, you can also use the AWS CloudFormation console to delete the bootstrap stack (CDKToolKit).


In this post, you learned a method for simple and cost-efficient client-side encryption for your tenant data. By using the DynamoDB Encryption Client, you were able to implement the encryption with less effort, all while using a standard Boto3 DynamoDB Table resource compatible interface.

Adding to the client-side encryption, you also learned how to apply attribute-based access control (ABAC) to your IAM access policies. You used ABAC for tenant isolation by applying conditions for both the DynamoDB table access, as well as access to the KMS key that is used for encryption of the tenant data in the DynamoDB table. By combining client-side encryption with ABAC, you have increased your data protection with multiple layers of security.

You can start experimenting today on your own by using the provided solution. If you have feedback about this post, submit comments in the Comments section below. If you have questions on the content, consider submitting them to AWS re:Post

Want more AWS Security news? Follow us on Twitter.

Jani Muuriaisniemi

Jani is a Principal Solutions Architect at Amazon Web Services based out of Helsinki, Finland. With more than 20 years of industry experience, he works as a trusted advisor with a broad range of customers across different industries and segments, helping the customers on their cloud journey.

How Hudl built a cost-optimized AWS Glue pipeline with Apache Hudi datasets

Post Syndicated from Indira Balakrishnan original https://aws.amazon.com/blogs/big-data/how-hudl-built-a-cost-optimized-aws-glue-pipeline-with-apache-hudi-datasets/

This is a guest blog post co-written with Addison Higley and Ramzi Yassine from Hudl.

Hudl Agile Sports Technologies, Inc. is a Lincoln, Nebraska based company that provides tools for coaches and athletes to review game footage and improve individual and team play. Its initial product line served college and professional American football teams. Today, the company provides video services to youth, amateur, and professional teams in American football as well as other sports, including soccer, basketball, volleyball, and lacrosse. It now serves 170,000 teams in 50 different sports around the world. Hudl’s overall goal is to capture and bring value to every moment in sports.

Hudl’s mission is to make every moment in sports count. Hudl does this by expanding access to more moments through video and data and putting those moments in context. Our goal is to increase access by different people and increase context with more data points for every customer we serve. Using data to generate analytics, Hudl is able to turn data into actionable insights, telling powerful stories with video and data.

To best serve our customers and provide the most powerful insights possible, we need to be able to compare large sets of data between different sources. For example, enriching our MongoDB and Amazon DocumentDB (with MongoDB compatibility) data with our application logging data leads to new insights. This requires resilient data pipelines.

In this post, we discuss how Hudl has iterated on one such data pipeline using AWS Glue to improve performance and scalability. We talk about the initial architecture of this pipeline, and some of the limitations associated with this approach. We also discuss how we iterated on that design using Apache Hudi to dramatically improve performance.

Problem statement

A data pipeline that ensures high-quality MongoDB and Amazon DocumentDB statistics data is available in our central data lake, and is a requirement for Hudl to be able to deliver sports analytics. It’s important to maintain the integrity of the data between MongoDB and Amazon DocumentDB transactional data with the data lake capturing changes in near-real time along with upserts to records in the data lake. Because Hudl statistics are backed by MongoDB and Amazon DocumentDB databases, in addition to a broad range of other data sources, it’s important that relevant MongoDB and Amazon DocumentDB data is available in a central data lake where we can run analytics queries to compare statistics data between sources.

Initial design

The following diagram demonstrates the architecture of our initial design.

Intial Ingestion Pipeline Design

Let’s discuss the key AWS services of this architecture:

  • AWS Data Migration Service (AWS DMS) allowed our team to move quickly in delivering this pipeline. AWS DMS gives our team a full snapshot of the data, and also offers ongoing change data capture (CDC). By combining these two datasets, we can ensure our pipeline delivers the latest data.
  • Amazon Simple Storage Service (Amazon S3) is the backbone of Hudl’s data lake because of its durability, scalability, and industry-leading performance.
  • AWS Glue allows us to run our Spark workloads in a serverless fashion, with minimal setup. We chose AWS Glue for its ease of use and speed of development. Additionally, features such as AWS Glue bookmarking simplified our file management logic.
  • Amazon Redshift offers petabyte-scale data warehousing. Amazon Redshift provides consistently fast performance, and easy integrations with our S3 data lake.

The data processing flow includes the following steps:

  1. Amazon DocumentDB holds the Hudl statistics data.
  2. AWS DMS gives us a full export of statistics data from Amazon DocumentDB, and ongoing changes in the same data.
  3. In the S3 Raw Zone, the data is stored in JSON format.
  4. An AWS Glue job merges the initial load of statistics data with the changed statistics data to give a snapshot of statistics data in JSON format for reference, eliminating duplicates.
  5. In the S3 Cleansed Zone, the JSON data is normalized and converted to Parquet format.
  6. AWS Glue uses a COPY command to insert Parquet data into Amazon Redshift consumption base tables.
  7. Amazon Redshift stores the final table for consumption.

The following is a sample code snippet from the AWS Glue job in the initial data pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)
   full_df = read_full_data()#Load entire dataset from S3 Cleansed Zone

cdc_df = read_cdc_data() # Read new CDC data which represents delta in the source MongoDB/DocumentDB

joined_df = full_df.join(cdc_df, '_id', 'full_outer') #Calculate final snapshot by joining the existing data with delta

result = joined_df.filter((joined_df.Op != 'D') | (joined_df.Op.isNull())) .select(coalesce(cdc_df._doc, full_df._doc).alias('_doc'))

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(result, gc) , connection_type = "s3", connection_options = {"path": output_path}, format = "parquet", transformation_ctx = "ctx4")


Although this initial solution met our need for data quality, we felt there was room for improvement:

  • The pipeline was slow – The pipeline ran slowly (over 2 hours) because for each batch, the whole dataset was compared. Every record had to be compared, flattened, and converted to Parquet, even when only a few records were changed from the previous daily run.
  • The pipeline was expensive – As the data size grew daily, the job duration also grew significantly (especially in step 4). To mitigate the impact, we needed to allocate more AWS Glue DPUs (Data Processing Units) to scale the job, which led to higher cost.
  • The pipeline limited our ability to scale – Hudl’s data has a long history of rapid growth with increasing customers and sporting events. Given this trend, our pipeline needed to run as efficiently as possible to handle only changing datasets to have predictable performance.

New design

The following diagram illustrates our updated pipeline architecture.

Although the overall architecture looks roughly the same, the internal logic in AWS Glue was significantly changed, along with addition of Apache Hudi datasets.

In step 4, AWS Glue now interacts with Apache HUDI datasets in the S3 Cleansed Zone to upsert or delete changed records as identified by AWS DMS CDC. The AWS Glue to Apache Hudi connector helps convert JSON data to Parquet format and upserts into the Apache HUDI dataset. Retaining the full documents in our Apache HUDI dataset allows us to easily make schema changes to our final Amazon Redshift tables without needing to re-export data from our source systems.

The following is a sample code snippet from the new AWS Glue pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)

upsert_conf = {'className': 'org.apache.hudi', '
hoodie.datasource.hive_sync.use_jdbc': 'false', 
'hoodie.datasource.write.precombine.field': 'write_ts', 
'hoodie.datasource.write.recordkey.field': '_id', 
'hoodie.table.name': 'glue_table', 
'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': 'glue_database', 'hoodie.datasource.hive_sync.table': 'glue_table', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.datasource.hive_sync.sync_as_datasource': 'false', 
'path': 's3://bucket/prefix/', 'hoodie.compact.inline': 'false', 'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.upsert.shuffle.parallelism': 200, 
'hoodie.datasource.write.operation': 'upsert', 
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
'hoodie.cleaner.commits.retained': 10 }

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(cdc_upserts_df, gc, "cdc_upserts_df"), connection_type="marketplace.spark", connection_options=upsert_conf)


With this new approach using Apache Hudi datasets with AWS Glue deployed after May 2022, the pipeline runtime was predictable and less expensive than the initial approach. Because we only handled new or modified records by eliminating the full outer join over the entire dataset, we saw an 80–90% reduction in runtime for this pipeline, thereby reducing costs by 80–90% compared to the initial approach. The following diagram illustrates our processing time before and after implementing the new pipeline.


With Apache Hudi’s open-source data management framework, we simplified incremental data processing in our AWS Glue data pipeline to manage data changes at the record level in our S3 data lake with CDC from Amazon DocumentDB.

We hope that this post will inspire your organization to build AWS Glue pipelines with Apache Hudi datasets that reduce cost and bring performance improvements using serverless technologies to achieve your business goals.

About the authors

Addison Higley is a Senior Data Engineer at Hudl. He manages over 20 data pipelines to help ensure data is available for analytics so Hudl can deliver insights to customers.

Ramzi Yassine is a Lead Data Engineer at Hudl. He leads the architecture, implementation of Hudl’s data pipelines and data applications, and ensures that our data empowers internal and external analytics.

Swagat Kulkarni is a Senior Solutions Architect at AWS and an AI/ML enthusiast. He is passionate about solving real-world problems for customers with cloud-native services and machine learning. Swagat has over 15 years of experience delivering several digital transformation initiatives for customers across multiple domains, including retail, travel and hospitality, and healthcare. Outside of work, Swagat enjoys travel, reading, and meditating.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

Create a Multi-Region Python Package Publishing Pipeline with AWS CDK and CodePipeline

Post Syndicated from Brian Smitches original https://aws.amazon.com/blogs/devops/create-a-multi-region-python-package-publishing-pipeline-with-aws-cdk-and-codepipeline/

Customers can author and store internal software packages in AWS by leveraging the AWS CodeSuite (AWS CodePipeline, AWS CodeBuild, AWS CodeCommit, and AWS CodeArtifact).  As of the publish date of this blog post, there is no native way to replicate your CodeArtifact Packages across regions. This blog addresses how a custom solution built with the AWS Cloud Development Kit and AWS CodePipeline can create a Multi-Region Python Package Publishing Pipeline.

Whether it’s for resiliency or performance improvement, many customers want to deploy their applications across multiple regions. When applications are dependent on custom software packages, the software packages should be replicated to multiple regions as well. This post will walk through how to deploy a custom package publishing pipeline in your own AWS Account. This pipeline connects a Python package source code repository to build and publish pip packages to CodeArtifact Repositories spanning three regions (the primary and two replica regions). While this sample CDK Application is built specifically for pip packages, the underlying architecture can be reused for different software package formats, such as npm, Maven, NuGet, etc.

Solution overview

The following figure demonstrates the solution workflow:

  1. A CodePipeline pipeline orchestrates the building and publishing of the software package
    1. This pipeline is triggered by commits on the main branch of the CodeCommit repository
    2. A CodeBuild job builds the pip packages using twine to be distributed
    3. The publish stage (third column) uses three parallel CodeBuild jobs to publish the distribution package to the two CodeArtifact repositories in separate regions
  1. The first CodeArtifact Repository stores the package contents in the primary region.
  2. The second and third CodeArtifact Repository act as replicas and store the package contents in other regions.
Figure 1. A figure showing the architecture diagram

Figure 1.  Architecture diagram

All of these resources are defined in a single AWS CDK Application. The resources are defined in CDK Stacks that are deployed as AWS CloudFormation Stacks. AWS CDK can deploy the different stacks across separate regions.


Before getting started, you will need the following:

  1. An AWS account
  2. An instance of the AWS Cloud9 IDE or an alternative local compute environment, such as your personal computer
  3. The following installed on your compute environment:
    1. AWS CDK
    2. AWS Command Line Interface (AWS CLI)
    3. npm
  1. The AWS Accounts must be bootstrapped for CDK in the necessary regions. The default configuration uses us-east-1, us-east-2 and us-west-2  as these three regions support CodeArtifact.

A new AWS Cloud9 IDE is recommended for this tutorial to isolate these actions in this post from your normal compute environment. See the Cloud9 Documentation for Creating an Environment.

Deploy the Python Package Publishing Pipeline into your AWS Account with the CDK

The source code can be found in this GitHub Repository.

  1. Fork the GitHub Repo into your account. This way you can experiment with changes as necessary to fit your workload.
  2. In your local compute environment, clone the GitHub Repository and cd into the project directory:
git clone [email protected]:<YOUR_GITHUB_USERNAME>/multi-region-
python-package-publishing-pipeline.git && cd multi-region-
  1. Install the necessary node packages:
npm i
  1. (Optional) Override the default configurations for the CodeArtifact domainName, repositoryName, primaryRegion, and replicaRegions.
    1. navigate to ./bin/multiregion_package_publishing.ts and update the relevant fields.
    2. From the project’s root directory (multi-region-python-package-publishing-pipeline), deploy the AWS CDK application. This step may take 5-10 minutes.
cdk deploy --all
  1. When prompted “Do you wish to deploy these changes (y/n)?”, Enter y.

Viewing the deployed CloudFormation stacks

After the deployment of the AWS CDK application completes, you can view the deployed AWS CDK Stacks via CloudFormation. From the AWS Console, search “CloudFormation’ in the search bar and navigate to the service dashboard. In the primary region (us-east-1(N. Virginia)) you should see two stacks: CodeArtifactPrimaryStack-<region> and PackagePublishingPipelineStack.

Screenshot showing the CloudFormation Stacks in the primary region

Figure 2. Screenshot showing the CloudFormation Stacks in the primary region

Switch regions to one of the secondary regions us-west-2 (Oregon) or us-east-2 (Ohio) to see the remaining stacks named CodeArtifactReplicaStack-<region>. These correspond to the three AWS CDK Stacks from the architecture diagram.

Screenshot showing the CloudFormation stacks in a separate region

Figure 3. Screenshot showing the CloudFormation stacks in a separate region

Viewing the CodePipeline Package Publishing Pipeline

From the Console, select the primary region (us-east-1) and navigate to CodePipeline by utilizing the search bar. Select the Pipeline titled packagePipeline and inspect the state of the pipeline. This pipeline triggers after every commit from the CodeCommit repository named PackageSourceCode. If the pipeline is still in process, then wait a few minutes, as this pipeline can take approximately 7–8 minutes to complete all three stages (Source, Build, and Publish). Once it’s complete, the pipeline should reflect the following screenshot:

A screenshot showing the CodePipeline flow

Figure 4. A screenshot showing the CodePipeline flow

Viewing the Published Package in the CodeArtifact Repository

To view the published artifacts, go to the primary or secondary region and navigate to the CodeArtifact dashboard by utilizing the search bar in the Console. You’ll see a repository named package-artifact-repo. Select the repository and you’ll see the sample pip package named mypippackage inside the repository. This package is defined by the source code in the CodeCommit repository named PackageSourceCode in the primary region (us-east-1).

Screenshot of the package repository

Figure 5. Screenshot of the package repository

Create a new package version in CodeCommit and monitor the pipeline release

Navigate to your CodeCommit’s PackageSourceCode (us-east-1 CodeCommit > Repositories > PackageSourceCode. Open the setup.py file and select the Edit button. Make a simple modification, change the version = '1.0.0' to version = '1.1.0' and commit the changes to the Main branch.

A screenshot of the source package's code repository in CodeCommit

Figure 6. A screenshot of the source package’s code repository in CodeCommit

Now navigate back to CodePipeline and watch as the pipeline performs the release automatically. When the pipeline finishes, this new package version will live in each of the three CodeArtifact Repositories.

Install the custom pip package to your local Python Environment

For your development team to connect to this CodeArtifact Repository to download repositories, you must configure the pip tool to look in this repository. From your Cloud9 IDE (or local development environment), let’s test the installation of this package for Python3:

  1. Copy the connection instructions for the pip tool. Navigate to the CodeArtifact repository of your choice and select View connection instructions
    1. Select Copy to copy the snippet to your clipboard
Screenshot showing directions to connect to a code artifact repository

Figure 7. Screenshot showing directions to connect to a code artifact repository

  1. Paste the command from your clipboard
  2. Run pip install mypippackage==1.0.0
Screenshot showing CodeArtifact login

Figure 8. Screenshot showing CodeArtifact login

  1. Test the package works as expected by importing the modules
  2. Start the Python REPL by running python3 in the terminal
Screenshot of the package being imported

Figure 9. Screenshot of the package being imported

Clean up

Destroy all of the AWS CDK Stacks by running cdk destroy --all from the root AWS CDK application directory.


In this post, we walked through how to deploy a CodePipeline pipeline to automate the publishing of Python packages to multiple CodeArtifact repositories in separate regions. Leveraging the AWS CDK simplifies the maintenance and configuration of this multi-region solution by using Infrastructure as Code and predefined Constructs. If you would like to customize this solution to better fit your needs, please read more about the AWS CDK and AWS Developer Tools. Some links we suggest include the CodeArtifact User Guide (with sections covering npm, Python, Maven, and NuGet), the CDK API Reference, CDK Pipelines, and the CodePipeline User Guide.

About the authors:

Andrew Chen

Andrew Chen is a Solutions Architect with an interest in Data Analytics, Machine Learning, and DevOps. Andrew has previous experience in management consulting in which he worked as a technical architect for various cloud migration projects. In his free time, Andrew enjoys fishing, hiking, kayaking, and keeping up with financial markets.

Brian Smitches

Brian Smitches is a Solutions Architect with an interest in Infrastructure as Code and the AWS Cloud Development Kit. Brian currently supports Federal SMB Partners and has previous experience with Full Stack Application Development. In his personal time, Brian enjoys skiing, water sports, and traveling with friends and family.

Ingest streaming data to Apache Hudi tables using AWS Glue and Apache Hudi DeltaStreamer

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-to-apache-hudi-tables-using-aws-glue-and-apache-hudi-deltastreamer/

In today’s world with technology modernization, the need for near-real-time streaming use cases has increased exponentially. Many customers are continuously consuming data from different sources, including databases, applications, IoT devices, and sensors. Organizations may need to ingest that streaming data into data lakes built on Amazon Simple Storage Service (Amazon S3). You may also need to achieve analytics and machine learning (ML) use cases in near-real time. To ensure consistent results in those near-real-time streaming use cases, incremental data ingestion and atomicity, consistency, isolation, and durability (ACID) properties on data lakes have been a common ask.

To address such use cases, one approach is to use Apache Hudi and its DeltaStreamer utility. Apache Hudi is an open-source data management framework designed for data lakes. It simplifies incremental data processing by enabling ACID transactions and record-level inserts, updates, and deletes of streaming ingestion on data lakes built on top of Amazon S3. Hudi is integrated with well-known open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Presto, and Trino, as well as with various AWS analytics services like AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift. The DeltaStreamer utility provides an easy way to ingest streaming data from sources like Apache Kafka into your data lake.

This post describes how to run the DeltaStreamer utility on AWS Glue to read streaming data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and ingest the data into S3 data lakes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. With AWS Glue, you can create Spark, Spark Streaming, and Python shell jobs to extract, transform, and load (ETL) data. You can create AWS Glue Spark streaming ETL jobs using either Scala or PySpark that run continuously, consuming data from Amazon MSK, Apache Kafka, and Amazon Kinesis Data Streams and writing it to your target.

Solution overview

To demonstrate the DeltaStreamer utility, we use fictional product data that represents product inventory including product name, category, quantity, and last updated timestamp. Let’s assume we stream the data from data sources to an MSK topic. Now we want to ingest this data coming from the MSK topic into Amazon S3 so that we can run Athena queries to analyze business trends in near-real time.

The following diagram provides the overall architecture of the solution described in this post.

To simulate application traffic, we use Amazon Elastic Compute Cloud (Amazon EC2) to send sample data to an MSK topic. Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kakfa to process streaming data. To consume the streaming data from Amazon MSK, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector 0.10.1 for AWS Glue 3.0, with the DeltaStreamer utility to write the ingested data to Amazon S3. The Apache Hudi Connector 0.9.0 for AWS Glue 3.0 also supports the DeltaStreamer utility.

As the data is being ingested, the AWS Glue streaming job writes the data into the Amazon S3 base path. The data in Amazon S3 is cataloged using the AWS Glue Data Catalog. We then use Athena, which is an interactive query service, to query and analyze the data using standard SQL.


We use an AWS CloudFormation template to provision some resources for our solution. The template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to ingest data to the MSK cluster running in a private subnet. Make sure you have a key in the AWS Region where you deploy the template. If you don’t have one, you can create a new key pair.

Create the Apache Hudi connection

To add the Apache Hudi Connector for AWS Glue, complete the following steps:

  1. On the AWS Glue Studio console, choose Connectors.
  2. Choose Go to AWS Marketplace.
  3. Search for and choose Apache Hudi Connector for AWS Glue.
  4. Choose Continue to Subscribe.
  5. Review the terms and conditions, then choose Accept Terms.

    After you accept the terms, it takes some time to process the request.
    When the subscription is complete, you see the Effective date populated next to the product.
  6. Choose Continue to Configuration.
  7. For Fulfillment option, choose Glue 3.0.
  8. For Software version, choose 0.10.1.
  9. Choose Continue to Launch.
  10. Choose Usage instructions, and then choose Activate the Glue connector from AWS Glue Studio.

    You’re redirected to AWS Glue Studio.
  11. For Name, enter Hudi-Glue-Connector.
  12. Choose Create connection and activate connector.

A message appears that the connection was successfully created. Verify that the connection is visible on the AWS Glue Studio console.

Launch the CloudFormation stack

For this post, we provide a CloudFormation template to create the following resources:

  • VPC, subnets, security groups, and VPC endpoints
  • AWS Identity and Access Management (IAM) roles and policies with required permissions
  • An EC2 instance running in a public subnet within the VPC with Kafka 2.12 installed and with the source data initial load and source data incremental load JSON files
  • An Amazon MSK server running in a private subnet within the VPC
  • An AWS Glue Streaming DeltaStreamer job to consume the incoming data from the Kafka topic and write it to Amazon S3
  • Two S3 buckets: one of the buckets stores code and config files, and other is the target for the AWS Glue streaming DeltaStreamer job

To create the resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-deltastreamer-glue-blog.
  3. For ClientIPCIDR, enter the IP address of your client that you use to connect to the EC2 instance.
  4. For HudiConnectionName, enter the AWS Glue connection you created earlier (Hudi-Glue-Connector).
  5. For KeyName, choose the name of the EC2 key pair that you created as a prerequisite.
  6. For VpcCIDR, leave as is.
  7. Choose Next.
  8. Choose Next.
  9. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the following information:

  • HudiDeltastreamerGlueJob – The AWS Glue streaming job name
  • MSKCluster – The MSK cluster ARN
  • PublicIPOfEC2InstanceForTunnel – The public IP of the EC2 instance for tunnel
  • TargetS3Bucket – The S3 bucket name

Create a topic in the MSK cluster

Next, SSH to Amazon EC2 using the key pair you created and run the following commands:

  1. SSH to the EC2 instance as ec2-user:
    ssh -i <KeyName> ec2-user@<PublicIPOfEC2InstanceForTunnel>

    You can get the KeyName value on the Parameters tab and the public IP of the EC2 instance for tunnel on the Outputs tab of the CloudFormation stack.

  2. For the next command, retrieve the bootstrap server endpoint of the MSK cluster by navigating to msk-source-cluster on the Amazon MSK console and choosing View client information.
  3. Run the following command to create the topic in the MSK cluster hudi-deltastream-demo:
    ./kafka_2.12-2.6.2/bin/kafka-topics.sh --create \
    --topic hudi-deltastream-demo \
    --bootstrap-server "<replace text with value under private endpoint on MSK>" \
    --partitions 1 \
    --replication-factor 2 \
    --command-config ./config_file.txt

  4. Ingest the initial data from the deltastreamer_initial_load.json file into the Kafka topic:
    ./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
    --broker-list "<replace text with value under private endpoint on MSK>" \
    --topic hudi-deltastream-demo \
    --producer.config ./config_file.txt < deltastreamer_initial_load.json

The following is the schema of a record ingested into the Kafka topic:

     "name": "id",
     "type": "int"
  }, {
     "name": "category",
     "type": "string"
  }, {
     "name": "ts",
     "type": "string"
     "name": "name",
     "type": "string"
     "name": "quantity",
     "type": "int"

The schema uses the following parameters:

  • id – The product ID
  • category – The product category
  • ts – The timestamp when the record was inserted or last updated
  • name – The product name
  • quantity – The available quantity of the product in the inventory

The following code gives an example of a record:

    "id": 1, 
    "category": "Apparel", 
    "ts": "2022-01-02 10:29:00", 
    "name": "ABC shirt", 
    "quantity": 4

Start the AWS Glue streaming job

To start the AWS Glue streaming job, complete the following steps:

  1. On the AWS Glue Studio console, find the job with the value for HudiDeltastreamerGlueJob.
  2. Choose the job to review the script and job details.
  3. On the Job details tab, replace the value of the --KAFKA_BOOTSTRAP_SERVERS key with the Amazon MSK bootstrap server’s private endpoint.
  4. Choose Save to save the job settings.
  5. Choose Run to start the job.

When the AWS Glue streaming job runs, the records from the MSK topic are consumed and written to the target S3 bucket created by AWS CloudFormation. To find the bucket name, check the stack’s Outputs tab for the TargetS3Bucket key value.

The data in Amazon S3 is stored in Parquet file format. In this example, the data written to Amazon S3 isn’t partitioned, but you can enable partitioning by specifying hoodie.datasource.write.partitionpath.field=<column_name> as the partition field and setting hoodie.datasource.write.hive_style_partitioning to True in the Hudi configuration property in the AWS Glue job script.

In this post, we write the data to a non-partitioned table, so we set the following two Hudi configurations:

  • hoodie.datasource.hive_sync.partition_extractor_class is set to org.apache.hudi.hive.NonPartitionedExtractor
  • hoodie.datasource.write.keygenerator.class is set to org.apache.hudi.keygen.NonpartitionedKeyGenerator

DeltaStreamer options and configuration

DeltaStreamer has multiple options available; the following are the options set in the AWS Glue streaming job used in this post:

  • continuous – DeltaStreamer runs in continuous mode running source-fetch.
  • enable-hive-sync – Enables table sync to the Apache Hive Metastore.
  • schemaprovider-class – Defines the class for the schema provider to attach schemas to the input and target table data.
  • source-class – Defines the source class to read data and has many built-in options.
  • source-ordering-field – The field used to break ties between records with the same key in input data. Defaults to ts (the Unix timestamp of record).
  • target-base-path – Defines the path for the target Hudi table.
  • table-type – Indicates the Hudi storage type to use. In this post, it’s set to COPY_ON_WRITE.

The following are some of the important DeltaStreamer configuration properties set in the AWS Glue streaming job:

# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc

# Kafka Source

#Kafka props

The configuration contains the following details:

  • hoodie.deltastreamer.schemaprovider.source.schema.file – The schema of the source record
  • hoodie.deltastreamer.schemaprovider.target.schema.file – The schema for the target record.
  • hoodie.deltastreamer.source.kafka.topic – The source MSK topic name
  • bootstap.servers – The Amazon MSK bootstrap server’s private endpoint
  • auto.offset.reset – The consumer’s behavior when there is no committed position or when an offset is out of range

Hudi configuration

The following are some of the important Hudi configuration options, which enable us to achieve in-place updates for the generated schema:

  • hoodie.datasource.write.recordkey.field – The record key field. This is the unique identifier of a record in Hudi.
  • hoodie.datasource.write.precombine.field – When two records have the same record key value, Apache Hudi picks the one with the largest value for the pre-combined field.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Possible values include UPSERT, INSERT, and BULK_INSERT.

AWS Glue Data Catalog table

The AWS Glue job creates a Hudi table in the Data Catalog mapped to the Hudi dataset on Amazon S3. Because the hoodie.datasource.hive_sync.table configuration parameter is set to product_table, the table is visible under the default database in the Data Catalog.

The following screenshot shows the Hudi table column names in the Data Catalog.

Query the data using Athena

With the Hudi datasets available in Amazon S3, you can query the data using Athena. Let’s use the following query:

SELECT * FROM "default"."product_table";

The following screenshot shows the query output. The table product_table has four records from the initial ingestion: two records for the category Apparel, one for Cosmetics, and one for Footwear.

Load incremental data into the Kafka topic

Now suppose that the store sold some quantity of apparel and footwear and added a new product to its inventory, as shown in the following code. The store sold two items of product ID 1 (Apparel) and one item of product ID 3 (Footwear). The store also added the Cosmetics category, with product ID 5.

{"id": 1, "category": "Apparel", "ts": "2022-01-02 10:45:00", "name": "ABC shirt", "quantity": 2}
{"id": 3, "category": "Footwear", "ts": "2022-01-02 10:50:00", "name": "DEF shoes", "quantity": 5}
{"id": 5, "category": "Cosmetics", "ts": "2022-01-02 10:55:00", "name": "JKL Lip gloss", "quantity": 7}

Let’s ingest the incremental data from the deltastreamer_incr_load.json file to the Kafka topic and query the data from Athena:

./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
--broker-list "<replace text with value under private endpoint on MSK>" \
--topic hudi-deltastream-demo \
--producer.config ./config_file.txt < deltastreamer_incr_load.json

Within a few seconds, you should see a new Parquet file created in the target S3 bucket under the product_table prefix. The following is the screenshot from Athena after the incremental data ingestion showing the latest updates.

Additional considerations

There are some hard-coded Hudi options in the AWS Glue Streaming job scripts. These options are set for the sample table that we created for this post, so update the options based on your workload.

Clean up

To avoid any incurring future charges, delete the CloudFormation stack, which deletes all the underlying resources created by this post, except for the product_table table created in the default database. Manually delete the product_table table under the default database from the Data Catalog.


In this post, we illustrated how you can add the Apache Hudi Connector for AWS Glue and perform streaming ingestion into an S3 data lake using Apache Hudi DeltaStreamer with AWS Glue. You can use the Apache Hudi Connector for AWS Glue to create a serverless streaming pipeline using AWS Glue streaming jobs with the DeltaStreamer utility to ingest data from Kafka. We demonstrated this by reading the latest updated data using Athena in near-real time.

As always, AWS welcomes feedback. If you have any comments or questions on this post, please share them in the comments.

About the authors

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Anand Prakash is a Senior Solutions Architect at AWS Data Lab. Anand focuses on helping customers design and build AI/ML, data analytics, and database solutions to accelerate their path to production.

Using AWS Shield Advanced protection groups to improve DDoS detection and mitigation

Post Syndicated from Joe Viggiano original https://aws.amazon.com/blogs/security/using-aws-shield-advanced-protection-groups-to-improve-ddos-detection-and-mitigation/

Amazon Web Services (AWS) customers can use AWS Shield Advanced to detect and mitigate distributed denial of service (DDoS) attacks that target their applications running on Amazon Elastic Compute Cloud (Amazon EC2), Elastic Local Balancing (ELB), Amazon CloudFront, AWS Global Accelerator, and Amazon Route 53. By using protection groups for Shield Advanced, you can logically group your collections of Shield Advanced protected resources. In this blog post, you will learn how you can use protection groups to customize the scope of DDoS detection for application layer events, and accelerate mitigation for infrastructure layer events.

What is a protection group?

A protection group is a resource that you create by grouping your Shield Advanced protected resources, so that the service considers them to be a single protected entity. A protection group can contain many different resources that compose your application, and the resources may be part of multiple protection groups spanning different AWS Regions within an AWS account. Common patterns that you might use when designing protection groups include aligning resources to applications, application teams, or environments (such as production and staging), and by product tiers (such as free or paid). For more information about setting up protection groups, see Managing AWS Shield Advanced protection groups.

Why should you consider using a protection group?

The benefits of protection groups differ for infrastructure layer (layer 3 and layer 4) events and application layer (layer 7) events. For layer 3 and layer 4 events, protection groups can reduce the time it takes for Shield Advanced to begin mitigations. For layer 7 events, protection groups add an additional reporting mechanism. There is no change in the mechanism that Shield Advanced uses internally for detection of an event, and you do not lose the functionality of individual resource-level detections. You receive both group-level and individual resource-level Amazon CloudWatch metrics to consume for operational use. Let’s look at the benefits for each layer in more detail.

Layers 3 and 4: Accelerate time to mitigate for DDoS events

For infrastructure layer (layer 3 and layer 4) events, Shield Advanced monitors the traffic volume to your protected resource. An abnormal traffic deviation signals the possibility of a DDoS attack, and Shield Advanced then puts mitigations in place. By default, Shield Advanced observes the elevation of traffic to a resource over multiple consecutive time intervals to establish confidence that a layer 3/layer 4 event is under way. In the absence of a protection group, Shield Advanced follows the default behavior of waiting to establish confidence before it puts mitigation in place for each resource. However, if the resources are part of a protection group, and if the service detects that one resource in a group is targeted, Shield Advanced uses that confidence for other resources in the group. This can accelerate the process of putting mitigations in place for those resources.

Consider a case where you have an application deployed in different AWS Regions, and each stack is fronted with a Network Load Balancer (NLB). When you enable Shield Advanced on the Elastic IP addresses associated with the NLB in each Region, you can optionally add those Elastic IP addresses to a protection group. If an actor targets one of the NLBs in the protection group and a DDoS attack is detected, Shield Advanced will lower the threshold for implementing mitigations on the other NLBs associated with the protection group. If the scope of the attack shifts to target the other NLBs, Shield Advanced can potentially mitigate the attack faster than if the NLB was not in the protection group.

Note: This benefit applies only to Elastic IP addresses and Global Accelerator resource types.

Layer 7: Reduce false positives and improve accuracy of detection for DDoS events

Shield Advanced detects application layer (layer 7) events when you associate a web access control list (web ACL) in AWS WAF with it. Shield Advanced consumes request data for the associated web ACL, analyzes it, and builds a traffic baseline for your application. The service then uses this baseline to detect anomalies in traffic patterns that might indicate a DDoS attack.

When you group resources in a protection group, Shield Advanced aggregates the data from individual resources and creates the baseline for the whole group. It then uses this aggregated baseline to detect layer 7 events for the group resource. It also continues to monitor and report for the resources individually, regardless of whether they are part of protection groups or not.

Shield Advanced provides three types of aggregation to choose from (sum, mean, and max) to aggregate the volume data of individual resources to use as a baseline for the whole group. We’ll look at the three types of aggregation, with a use case for each, in the next section.

Note: Traffic aggregation is applicable only for layer 7 detection.

Case 1: Blue/green deployments

Blue/green is a popular deployment strategy that increases application availability and reduces deployment risk when rolling out changes. The blue environment runs the current application version, and the green environment runs the new application version. When testing is complete, live application traffic is directed to the green environment, and the blue environment is dismantled.

During blue/green deployments, the traffic to your green resources can go from zero load to full load in a short period of time. Shield Advanced layer 7 detection uses traffic baselining for individual resources, so newly created resources like an Application Load Balancer (ALB) that are part of a blue/green operation would have no baseline, and the rapid increase in traffic could cause Shield Advanced to declare a DDoS event. In this scenario, the DDoS event could be a false positive.

Figure 1: A blue/green deployment with ALBs in a protection group. Shield is using the sum of total traffic to the group to baseline layer 7 traffic for the group as a single unit

Figure 1: A blue/green deployment with ALBs in a protection group. Shield is using the sum of total traffic to the group to baseline layer 7 traffic for the group as a single unit

In the example architecture shown in Figure 1, we have configured Shield to include all resources of type ALB in a single protection group with aggregation type sum. Shield Advanced will use the sum of traffic to all resources in the protection group as an additional baseline. We have only one ALB (called blue) to begin with. When you add the green ALB as part of your deployment, you can optionally add it to the protection group. As traffic shifts from blue to green, the total traffic to the protection group remains the same even though the volume of traffic changes for the individual resources that make up the group. After the blue ALB is deleted, the Shield Advanced baseline for that ALB is deleted with it. At this point, the green ALB hasn’t existed for sufficient time to have its own accurate baseline, but the protection group baseline persists. You could still receive a DDoSDetected CloudWatch metric with a value of 1 for individual resources, but with a protection group you have the flexibility to set one or more alarms based on the group-level DDoSDetected metric. Depending on your application’s use case, this can reduce non-actionable event notifications.

Note: You might already have alarms set for individual resources, because the onboarding wizard in Shield Advanced provides you an option to create alarms when you add protection to a resource. So, you should review the alarms you already have configured before you create a protection group. Simply adding a resource to a protection group will not reduce false positives.

Case 2: Resources that have traffic patterns similar to each other

Client applications might interact with multiple services as part of a single transaction or workflow. These services can be behind their own dedicated ALBs or CloudFront distributions and can have traffic patterns similar to each other. In the example architecture shown in Figure 2, we have two services that are always called to satisfy a user request. Consider a case where you add a new service to the mix. Before protection groups existed, setting up such a new protected resource, such as ALB or CloudFront, required Shield Advanced to build a brand-new baseline. You had to wait for a certain minimum period before Shield Advanced could start monitoring the resource, and the service would need to monitor traffic for a few days in order to be accurate.

Figure 2: Deploying a new service and including it in a protection group with an existing baseline. Shield is using the mean aggregation type to baseline traffic for the group.

Figure 2: Deploying a new service and including it in a protection group with an existing baseline. Shield is using the mean aggregation type to baseline traffic for the group.

For improved accuracy of detection of level 7 events, you can cause Shield Advanced to inherit the baseline of existing services that are part of the same transaction or workflow. To do so, you can put your new resource in a protection group along with an existing service or services, and set the aggregation type to mean. Shield Advanced will take some time to build up an accurate baseline for the new service. However, the protection group has an established baseline, so the new service won’t be susceptible to decreased accuracy of detection for that period of time. Note that this setting will not stop Shield Advanced from sending notifications for the new service individually; however, you might prefer to take corrective action based on the detection for the group instead.

Case 3: Resources that share traffic in a non-uniform way

Consider the case of a CloudFront distribution with an ALB as origin. If the content is cached in CloudFront edge locations, the traffic reaching the application will be lower than that received by the edge locations. Similarly, if there are multiple origins of a CloudFront distribution, the traffic volumes of individual origins will not reflect the aggregate traffic for the application. Scenarios like invalidation of cache or an origin failover can result in increased traffic at one of the ALB origins. This could cause Shield Advanced to send “1” as the value for the DDoSDetected CloudWatch metric for that ALB. However, you might not want to initiate an alarm or take corrective action in this case.

Figure 3: CloudFront and ALBs in a protection group with aggregation type max. Shield is using CloudFront’s baseline for the group

Figure 3: CloudFront and ALBs in a protection group with aggregation type max. Shield is using CloudFront’s baseline for the group

You can combine the CloudFront distribution and origin (or origins) in a protection group with the aggregation type set to max. Shield Advanced will consider the CloudFront distribution’s traffic volume as the baseline for the protection group as a whole. In the example architecture in Figure 3, a CloudFront distribution fronts two ALBs and balances the load between the two. We have bundled all three resources (CloudFront and two ALBs) into a protection group. In case one ALB fails, the other ALB will receive all the traffic. This way, although you might receive an event notification for the active ALB at the individual resource level if Shield detects a volumetric event, you might not receive it for the protection group because Shield Advanced will use CloudFront traffic as the baseline for determining the increase in volume. You can set one or more alarms and take corrective action according to your application’s use case.


In this blog post, we showed you how AWS Shield Advanced provides you with the capability to group resources in order to consider them a single logical entity for DDoS detection and mitigation. This can help reduce the number of false positives and accelerate the time to mitigation for your protected applications.

A Shield Advanced subscription provides additional capabilities, beyond those discussed in this post, that supplement your perimeter protection. It provides integration with AWS WAF for level 7 DDoS detection, health-based detection for reducing false positives, enhanced visibility into DDoS events, assistance from the Shield Response team, custom mitigations, and cost-protection safeguards. You can learn more about Shield Advanced capabilities in the AWS Shield Advanced User Guide.

If you have feedback about this blog post, submit comments in the Comments section below. You can also start a new thread on AWS Shield re:Post to get answers from the community.

Want more AWS Security news? Follow us on Twitter.

Joe Viggiano

Joe Viggiano

Joe is a Sr. Solutions Architect helping media and entertainment companies accelerate their adoptions of cloud-based solutions.

Deepak Garg

Deepak Garg

Deepak is a Solutions Architect at AWS. He loves diving deep into AWS services and sharing his knowledge with customers. Deepak has background in Content Delivery Networks and Telecommunications.

How to centralize findings and automate deletion for unused IAM roles

Post Syndicated from Hong Pham original https://aws.amazon.com/blogs/security/how-to-centralize-findings-and-automate-deletion-for-unused-iam-roles/

Maintaining AWS Identity and Access Management (IAM) resources is similar to keeping your garden healthy over time. Having visibility into your IAM resources, especially the resources that are no longer used, is important to keep your AWS environment secure. Proactively detecting and responding to unused IAM roles helps you prevent unauthorized entities from gaining access to your AWS resources. In this post, I will show you how to apply resource tags on IAM roles and deploy serverless technologies on AWS to detect unused IAM roles and to require the owner of the IAM role (identified through tags) to take action.

You can use this solution to check for unused IAM roles in a standalone AWS account. As you grow your workloads in the cloud, you can run this solution for multiple AWS accounts by using AWS Organizations. In this solution, you use AWS Control Tower to create an AWS Organizations organization with a Security organizational unit (OU), and a Security account in this OU. In this blog post, you deploy the solution in the Security account belonging to a Security OU of an organization.

For more information and recommended best practices, see the blog post Managing the multi-account environment using AWS Organizations and AWS Control Tower. Following this best practice, you can create a Security OU, in which you provision one or more Security and Audit accounts that are dedicated for security automation and audit activities on behalf of the entire organization.

Solution architecture

The architecture diagram in Figure 1 demonstrates the solution workflow.

Figure 1: Solution workflow for standalone account or member account of an AWS Organization.

Figure 1: Solution workflow for standalone account or member account of an AWS Organization.

The solution is triggered periodically by an Amazon EventBridge scheduled rule and invokes a series of actions. You specify the frequency (in number of days) when you create the EventBridge rule. There are two options to run this solution, based on the needs of your organization.

Option 1: For a standalone account

Choose this option if you would like to check for unused IAM roles in a single AWS account. This AWS account might or might not belong to an organization or OU. In this blog post, I refer to this account as the standalone account.


  1. You need an AWS account specifically for security automation. For this blog post, I refer to this account as the standalone Security account.
  2. You should deploy the solution to the standalone Security account, which has appropriate admin permission to audit other accounts and manage security automation.
  3. Because this solution uses AWS CloudFormation StackSets, you need to grant self-managed permissions to create stack sets in standalone accounts. Specifically, you need to establish a trust relationship between the standalone Security account and the standalone account by creating the AWSCloudFormationStackSetAdministrationRole IAM role in the standalone Security account, and the AWSCloudFormationStackSetExecutionRole IAM role in the standalone account.
  4. You need to have AWS Security Hub enabled in your standalone Security account, and you need to deploy the solution in the same AWS Region as your Security Hub dashboard.
  5. You need a tagging enforcement in place for IAM roles. This solution uses an IAM tag key Owner to identify the email address of the owner. The value of this tag key should be the email address associated with the owner of the IAM role. If the Owner tag isn’t available, the notification email is sent to the email address that you provided in the parameter ITSecurityEmail when you provisioned the CloudFormation stack.
  6. This solution uses Amazon Simple Email Service (Amazon SES) to send emails to the owner of the IAM roles. The destination address needs to be verified with Amazon SES. With Amazon SES, you can verify identity at the individual email address or at the domain level.

An EventBridge rule triggers the AWS Lambda function LambdaCheckIAMRole in the standalone Security account. The LambdaCheckIAMRolefunction assumes a role in the standalone account. This role is named after the Cloudformation stack name that you specify when you provision the solution. Then LambdaCheckIAMRole calls the IAM API action GetAccountAuthorizationDetails to get the list of IAM roles in the standalone account, and parses the data type RoleLastUsed to retrieve the date, time, and the Region in which the roles were last used. If the last time value is not available, the IAM role is skipped. Based on the CloudFormation parameter MaxDaysForLastUsed that you provide, LambdaCheckIAMRole determines if the last time used is greater than the MaxDaysForLastUsed value. LambdaCheckIAMRole also extracts tags associated with the IAM roles, and retrieves the email address of the IAM role owner from the value of the tag key Owner. If there is no Owner tag, then LambdaCheckIAMRole sends an email to a default email address provided by you from the CloudFormation parameter ITSecurityEmail.

Option 2: For all member accounts that belong to an organization or an OU

Choose this option if you want to check for unused IAM roles in every member account that belongs to an AWS Organizations organization or OU.


  1. You need to have an AWS Organizations organization with a dedicated Security account that belongs to a Security OU. For this blog post, I refer to this account as the Security account.
  2. You should deploy the solution to the Security account that has appropriate admin permission to audit other accounts and to manage security automation.
  3. Because this solution uses CloudFormation StackSets to create stack sets in member accounts of the organization or OU that you specify, the Security account in the Security OU needs to be granted CloudFormation delegated admin permission to create AWS resources in this solution.
  4. You need Security Hub enabled in your Security account, and you need to deploy the solution in the same Region as your Security Hub dashboard.
  5. You need tagging enforcement in place for IAM roles. This solution uses the IAM tag key Owner to identify the owner email address. The value of this tag key should be the email address associated with the owner of the IAM role. If the Owner tag isn’t available, the notification email will be sent to the email address that you provided in the parameter ITSecurityEmail when you provisioned the CloudFormation stack.
  6. This solution uses Amazon SES to send emails to the owner of the IAM roles. The destination address needs to be verified with Amazon SES. With Amazon SES, you can verify identity at the individual email address or at the domain level.

An EventBridge rule triggers the Lambda function LambdaGetAccounts in the Security account to collect the account IDs of member accounts that belong to the organization or OU. LambdaGetAccounts sends those account IDs to an SNS topic. Each account ID invokes the Lambda function LambdaCheckIAMRole once.

Similar to the process for Option 1, LambdaCheckIAMRole in the Security account assumes a role in the member account(s) of the organization or OU, and checks the last time that IAM roles in the account were used.

In both options, if an IAM role is not currently used, the function LambdaCheckIAMRole generates a Security Hub finding, and performs BatchImportFindings for all findings to Security Hub in the Security account. At the same time, the Lambda function starts an AWS Step Functions state machine execution. Each execution is for an unused IAM role following this naming convention:
[target-account-id]-[unused IAM role name]-[time the execution created in Unix format]

You should avoid running this solution against special IAM roles, such as a break-glass role or a disaster recovery role. In the CloudFormation parameter RolePatternAllowedlist, you can provide a list of role name patterns to skip the check.

Use a Step Functions state machine to process approval

Figure 2 shows the state machine workflow for owner approval.

Figure 2: Owner approval state machine workflow

Figure 2: Owner approval state machine workflow

After the solution identifies an unused IAM role, it creates a Step Functions state machine execution. Figure 2 demonstrates the workflow of the execution. After the execution starts, the first Lambda task NotifyOwner (powered by the Lambda function NotifyOwnerFunction) sends an email to notify the IAM role owner. This is a callback task that pauses the execution until a taskToken is returned. The maximum pause for a callback task is 1 year. The execution waits until the owner responds with a decision to delete or keep the role, which is captured by a private API endpoint in Amazon API Gateway. You can configure a timeout to avoid waiting for callback task execution.

With a private API endpoint, you can build a REST API that is only accessible within your Amazon Virtual Private Cloud (Amazon VPC), or within your internal network connected to your VPC. Using a private API endpoint will prevent anyone from outside of your internal network from selecting this link and deleting the role. You can implement authentication and authorization with API Gateway to make sure that only the appropriate owner can delete a role.

If the owner denies role deletion, then the role remains intact until the next automation cycle runs, and the state machine execution stops immediately with a Fail status. If the owner approves role deletion, the next Lambda task Approve (powered by the function ApproveFunction) checks again if the role is not currently used. If the role isn’t in use, the Lambda task Approve attaches an IAM policy DenyAllCheckUnusedIAMRoleSolution to deny the role to perform any actions, and waits for 30 days. During this wait time, you can restore the IAM role by removing the IAM policy DenyAllCheckUnusedIAMRoleSolution from the role. The Step Functions state machine execution for this role is still in progress until the wait time expires.

After the wait time expires, the state machine execution invokes the Validate task. The Lambda function ValidateFunction checks again if the role is not in use after the amount of time calculated by adding MaxDaysForLastUsed and the preceding wait time. It also checks if the IAM policy DenyAllCheckUnusedIAMRoleSolution is attached to the role. If both of these conditions are true, the Lambda function follows a process to detach the IAM policies and delete the role permanently. The role can’t be recovered after deletion.

Note: To restore a role that has been marked for deletion, detach the DenyAll IAM policy from the role.

To deploy the solution using the AWS CLI

  1. Clone git repo from AWS Samples to get source code and CloudFormation templates.
    git clone https://github.com/aws-samples/aws-blog-automate-iam-role-deletion 
    cd /aws-blog-automate-iam-role-deletion

  2. Run the AWS CLI command below to upload CloudFormation templates and Lambda code to a S3 bucket in the Security Account. The S3 bucket needs to be in the same Region where you will deploy the solution.
    • To deploy the solution for a single account, use the following commands. Be sure to replace <YOUR_BUCKET_NAME> and <PATH_TO_UPLOAD_CODE> with your own values.
      #Deploy solution for a single target AWS Account
      aws cloudformation package \
      --template-file solution_scope_account.yml \
      --s3-bucket <YOUR_BUCKET_NAME> \
      --s3-prefix <PATH_TO_UPLOAD_CODE> \
      --output-template-file solution_scope_account.template

    • To deploy the solution for an organization or OU, use the following commands. Be sure to replace <YOUR_BUCKET_NAME> and <PATH_TO_UPLOAD_CODE> with your own values.
      #Deploy solution for an Organization/OU
      aws cloudformation package \
      --template-file solution_scope_organization.yml \
      --s3-bucket <YOUR_BUCKET_NAME> \
      --s3-prefix <PATH_TO_UPLOAD_CODE> \
      --output-template-file solution_scope_organization.template

  3. Validate the template generated by the CloudFormation package.
    • To validate the solution for a single account, use the following commands.
      #Deploy solution for a single target AWS Account
      aws cloudformation validate-template —template-body file://solution_scope_account.template

    • To validate the solution for an organization or OU, use the following commands.
      #Deploy solution for an Organization/OU
      aws cloudformation validate-template —template-body file://solution_scope_organization.template

  4. Deploy the solution in the same Region that you use for Security Hub. The stack takes 30 minutes to complete deployment.
    • To deploy the solution for a single account, use the following commands. Be sure to replace all of the placeholders with your own values.
      #Deploy solution for a single target AWS Account
      aws cloudformation deploy \
      --template-file solution_scope_account.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --parameter-overrides AccountId='<STANDALONE ACCOUNT ID>' \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail='<YOUR IT TEAM EMAIL>' \
      RolePatternAllowedlist='<ALLOWED PATTERN>'

    • To deploy the solution for an organization, run the following commands to create CloudFormation stack in the Security Account of the organization.
      #Deploy solution for an Organization
      aws cloudformation deploy \
      --template-file solution_scope_organization.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --parameter-overrides Scope=Organization \
      OrganizationId='<o-12345abcde>' \
      OrgRootId='<r-1234>'  \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail='<[email protected]>' \
      RolePatternAllowedlist='<ALLOWED PATTERN>'

    • To deploy the solution for an OU, run the following commands to create CloudFormation stack in the Security Account of the organization.
      #Deploy solution for an OU
      aws cloudformation deploy \
      --template-file solution_scope_organization.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --parameter-overrides Scope=OrganizationalUnit \
      OrganizationId='<o-12345abcde>' \
      OrganizationalUnitId='<ou-1234-1234abcd>'  \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail=’<[email protected]>’ \
      RolePatternAllowedlist=’<ALLOWED PATTERN>

Test the solution

The solution is triggered by an EventBridge scheduled rule, so it doesn’t perform the checks immediately. To test the solution right away after the CloudFormation stacks are successfully created, follow these steps.

To manually trigger the automation for a single account

  1. Navigate to the AWS Lambda console and choose the function
    <CloudFormation stackname>-LambdaCheckIAMRole.
  2. Choose Test.
  3. Choose New event.
  4. For Name, enter a name for the event, and provide the current time in UTC Date Time format YYYY-MM-DDTHH:MM:SSZ. For example {“time”: “2022-01-22T04:36:52Z”}. The Lambda function uses this value to calculate how much time has passed since the last time that a role was used. Figure 5 shows an example of configuring a test event.
    Figure 5: Configure test event for standalone account

    Figure 5: Configure test event for standalone account

  5. Choose Test.

To manually trigger the automation for an organization or OU

  1. Choose the function
    [CloudFormation stackname]-LambdaGetAccounts.
  2. Choose Test.
  3. Choose New event.
  4. For Name, enter a name for the event. Leave the default values for the remaining fields.
  5. Choose Test.

Respond to unused IAM roles

After you’ve triggered the Lambda function, the automation runs the necessary checks. For each unused IAM role, it creates a Step Functions state machine execution.

To see the list of Step Functions state machine executions

  1. Navigate to the AWS Step Functions console.
  2. Choose state machine [CloudFormation stackname]OnwerApprovalStateMachine.
  3. Under the Executions tab, you will see the list of executions in running state following this naming convention: [target-account-id]-[unused IAM role name]-[time the execution created in Unix format]. Figure 6 shows an example list of executions.
    Figure 6: Each unused IAM role generates an execution in the Step Functions state machine

    Figure 6: Each unused IAM role generates an execution in the Step Functions state machine

Each execution sends out an email notification to the IAM role owner (if available through the Owner tag) or to the IT security email address that you provided in the CloudFormation stack parameter ITSecurityEmail. The email content is:

Subject: Please take action on this unused IAM Role
This IAM Role arn:aws:iam::<AWS account>:role/<role name> is not in use for
more than 60 days.
Can you please delete the role by following this link: Approve link
Or keep this role by following this link: Deny Link

In the email, the Approve link and Deny link is the hyperlink to a private API endpoint with a parameter taskToken. If you try to access these links publicly, they won’t work. When you access the link, the taskToken is provided to the private API endpoint, which updates the Step Functions state machine.

To test the approval action using an API Gateway test

  1. Navigate to the AWS Step Functions console. Under State machines, choose the state machine that has the name [CloudFormation stackname]OwnerApprovalStateMachine
  2. On the Executions tab, there is a list of executions. Each execution represents a workflow for one IAM role, as shown in Figure 6. Choose the execution name that includes the IAM role name in the email that you received earlier.
  3. Scroll down to Execution event history.
  4. Expand the Step Notify Owner, enter TaskScheduled, find the item taskToken, and copy its value to a notepad, as shown in Figure 7.
    Figure 7: Retrieve taskToken from execution

    Figure 7: Retrieve taskToken from execution

  5. Navigate to the API Gateway console.
  6. Choose the API that has a name similar to [CloudFormation stackname]-PrivateAPIGW-[unique string]-ApprovalEndpoint.
  7. Choose which action to test: Deny or Approve.
    • To test the Deny action, under /deny resource, choose the GET method.
    • To test the Approve action, under /approve resource, choose the GET method.
  8. Choose Test.
  9. Under Query Strings, enter taskToken= and paste the taskToken you copied earlier from the state machine execution. Figure 8 shows how to pass the taskToken to API Gateway.
    Figure 8: Provide taskToken to API Gateway Method

    Figure 8: Provide taskToken to API Gateway Method

  10. Choose Test. After you test, the state machine resumes the workflow and finishes the automation. You won’t be able to change the action.
  11. Navigate to the AWS Step Functions console. Choose the state machine and go to the state machine execution.
    1. If you choose to deny the role deletion, the execution immediately stops as Fail.
    2. If you choose to approve the role deletion, the execution moves to the Wait task. This task removes IAM policies associated to the role and waits for a period of time before moving to the next task. By default, the wait time is 30 days. To change this number, go to the Lambda function [CloudFormation stackname]ApproveFunction, and update the variable wait_time_stamp.
    3. After the waiting period expires, the state machine triggers the Validate task to do a final validation on the role before deleting it. If the Validate task decides that the role is being used, it leaves the role intact. Otherwise, it deletes the role permanently.


In this blog post, you learned how serverless services such as Lambda, Step Functions, and API Gateway can work together to build security automation. We recommend testing this solution as a starting point. Then, you can build more features on top of the sample code and templates to customize it to perform checks, following guidance from your IT security team.

Here are a few suggestions that you can take to extend this solution.

  • This solution uses a private API Gateway to handle the approval response from the IAM role owner. You need to establish private connectivity between your internal network and AWS to invoke a private API Gateway. For instructions, see How to invoke a private API.
  • Add a mechanism to control access to API Gateway by using endpoint policies for interface VPC endpoints.
  • Archive the Security Hub finding after the IAM role is deleted using the AWS CLI or AWS Console.
  • Use a Step Functions state machine for other automation that needs human approval.
  • Add the capability to report on IAM roles that were skipped due to the absence of RoleLastUsed information.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Hong Pham

Hong Pham

Hong is a Senior Solutions Architect at AWS. For more than five years, she has helped many customers from start-ups to enterprises in different industries to adopt Cloud Computing. She was born in Vietnam and currently lives in Seattle, Washington.