Tag Archives: Expert (400)

Explore real-world use cases for Amazon CodeWhisperer powered by AWS Glue Studio notebooks

Post Syndicated from Ishan Gaur original https://aws.amazon.com/blogs/big-data/explore-real-world-use-cases-for-amazon-codewhisperer-powered-by-aws-glue-studio-notebooks/

Many customers are interested in boosting productivity in their software development lifecycle by using generative AI. Recently, AWS announced the general availability of Amazon CodeWhisperer, an AI coding companion that uses foundational models under the hood to improve software developer productivity. With Amazon CodeWhisperer, you can quickly accept the top suggestion, view more suggestions, or continue writing your own code. This integration reduces the overall time spent in writing data integration and extract, transform, and load (ETL) logic. It also helps beginner-level programmers write their first lines of code. AWS Glue Studio notebooks allows you to author data integration jobs with a web-based serverless notebook interface.

In this post, we discuss real-world use cases for CodeWhisperer powered by AWS Glue Studio notebooks.

Solution overview

For this post, you use the CSV eSports Earnings dataset, available to download via Kaggle. The data is scraped from eSportsEarnings.com, which provides information on earnings of eSports players and teams. The objective is to perform transformations using an AWS Glue Studio notebook with CodeWhisperer recommendations and then write the data back to Amazon Simple Storage Service (Amazon S3) in Parquet file format as well as to Amazon Redshift.

Prerequisites

Our solution has the following prerequisites:

  1. Set up AWS Glue Studio.
  2. Configure an AWS Identity and Access Management (IAM) role to interact with CodeWhisperer. Attach the following policy to your IAM role that is attached to the AWS Glue Studio notebook:
    {
        "Version": "2012-10-17",
        "Statement": [{
            "Sid": "CodeWhispererPermissions",
            "Effect": "Allow",
            "Action": [
                "codewhisperer:GenerateRecommendations"
            ],
            "Resource": "*"
        }]
    }

  3. Download the CSV eSports Earnings dataset and upload the CSV file highest_earning_players.csv to the S3 folder you will be using in this use case.

Create an AWS Glue Studio notebook

Let’s get started. Create a new AWS Glue Studio notebook job by completing the following steps:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.
  2. Select Jupyter Notebook and choose Create.
  3. For Job name, enter CodeWhisperer-s3toJDBC.

A new notebook will be created with the sample cells as shown in the following screenshot.

We use the second cell for now, so you can remove all the other cells.

  1. In the second cell, update the interactive session configuration by setting the following:
    1. Worker type to G.1X
    2. Number of workers to 3
    3. AWS Glue version to 4.0
  2. Moreover, import the DynamicFrame module and current_timestamp function as follows:
    from pyspark.sql.functions import current_timestamp
    from awsglue.dynamicframe import DynamicFrame

After you make these changes, the notebook should be looking like the following screenshot.

Now, let’s ensure CodeWhisperer is working as intended. At the bottom right, you will find the CodeWhisperer option beside the Glue PySpark status, as shown in the following screenshot.

You can choose CodeWhisperer to view the options to use Auto-Suggestions.

Develop your code using CodeWhisperer in an AWS Glue Studio notebook

In this section, we show how to develop an AWS Glue notebook job for Amazon S3 as a data source and JDBC data sources as a target. For our use case, we need to ensure Auto-Suggestions are enabled. Write your recommendation using CodeWhisperer using the following steps:

  1. Write a comment in natural language (in English) to read Parquet files from your S3 bucket:
    # Read CSV files from S3

After you enter the preceding comment and press Enter, the CodeWhisperer button at the end of the page will show that it is running to write the recommendation. The output of the CodeWhisperer recommendation will appear in the next line and the code is chosen after you press Tab. You can learn more in User actions.

After you enter the preceding comment, CodeWhisperer will generate a code snippet that is similar to the following:

df = (spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("s3://<bucket>/<path>/highest_earning_players.csv"))

Note that you need to update the paths to match the S3 bucket you’re using instead of the CodeWhisperer-generated bucket.

From the preceding code snippet, CodeWhisperer used Spark DataFrames to read the CSV files.

  1. You can now try some rephrasing to get a suggestion with DynamicFrame functions:
# Read CSV file from S3 with the header format option using DynamicFrame"

Now CodeWhisperer will generate a code snippet that is close to the following:

dyF = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://<bucket>/<path>/highest_earning_players.csv"],
        "recurse": True,
    },
    format="csv",
    format_options={
        "withHeader": True,
    },
    transformation_ctx="dyF")

Rephrasing the sentences written now has proved that after some modifications to the comments we wrote, we got the correct recommendation from CodeWhisperer.

  1. Next, use CodeWhisperer to print the schema of the preceding AWS Glue DynamicFrame by using the following comment:
    # Print the schema of the above DynamicFrame

CodeWhisperer will generate a code snippet that is close to the following:

dyF.printSchema()

We get the following output.

Now we use CodeWhisperer to create some transformation functions that can manipulate the AWS Glue DynamicFrame read earlier. We start by entering code in a new cell.

  1. First, test if CodeWhisperer can use the correct AWS Glue context functions like ResolveChoice:
    # Convert the "PlayerId" type from string to integer

CodeWhisperer has recommended a code snippet similar to the following:

dyF = dyF.resolveChoice(specs=[('PlayerId', 'cast:long')])
dyF.printSchema()

The preceding code snippet doesn’t accurately represent the comment that we entered.

  1. You can apply sentence paraphrasing and simplifying by providing the following three comments. Each one has different ask and we use the withColumn Spark Frame method, which is used in casting columns types:
    # Convert the DynamicFrame to spark data frame
    # Cast the 'PlayerId' column from string to Integer using WithColumn function
     # Convert the spark frame back to DynamicFrame and print the schema

CodeWhisperer will pick up the preceding commands and recommend the following code snippet in sequence:

df = dyF.toDF()
df = df.withColumn("PlayerId", df["PlayerId"].cast("integer"))
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")
dyF.printSchema()

The following output confirms the PlayerId column is changed from string to integer.

  1. Apply the same process to the resultant AWS Glue DynamicFrame for the TotalUSDPrize column by casting it from string to long using the withColumn Spark Frame functions by entering the following comments:
    # Convert the dynamicFrame to Spark Frame
    # Cast the "TotalUSDPrize" column from String to long
    # Convert the spark frame back to dynamic frame and print the schema

The recommended code snippet is similar to the following:

df = dyF.toDF()
df = df.withColumn("TotalUSDPrize", df["TotalUSDPrize"].cast("long"))
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")
dyF.printSchema()

The output schema of the preceding code snippet is as follows.

Now we will try to recommend a code snippet that reflects the average prize for each player according to their country code.

  1. To do so, start by getting the count of the player per each country:
    # Get the count of each country code

The recommended code snippet is similar to the following:

country_code_count = df.groupBy("CountryCode").count()
country_code_count.show()

We get the following output.

  1. Join the main DataFrame with the country code count DataFrame and then add a new column calculating the average highest prize for each player according to their country code:
    # Convert the DynamicFrame (dyF) to dataframe (df)
    # Join the dataframe (df) with country_code_count dataframe with respect to CountryCode column
    # Convert the spark frame back to DynamicFrame and print the schema

The recommended code snippet is similar to the following:

df = dyF.toDF()
df = df.join(country_code_count, "CountryCode")
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")
dyF.printSchema()

The output of the schema now confirms the both DataFrames where correctly joined and the Count column is added to the main DataFrame.

  1. Get the code recommendation on the code snippet to calculate the average TotalUSDPrize for each country code and add it to a new column:
    # Get the sum of all the TotalUSDPrize column per countrycode
    # Rename the sum column to be "SumPrizePerCountry" in the newly generated dataframe

The recommended code snippet is similar to the following:

country_code_sum = df.groupBy("CountryCode").sum("TotalUSDPrize")
country_code_sum = country_code_sum.withColumnRenamed("sum(TotalUSDPrize)", "SumPrizePerCountry")
country_code_sum.show()

The output of the preceding code should look like the following.

  1. Join the country_code_sum DataFrame with the main DataFrame from earlier and get the average of the prizes per player per country:
    # Join the above dataframe with the main dataframe with respect to CountryCode
    # Get the average Total prize in USD per player per country and add it to a new column called "AveragePrizePerPlayerPerCountry"

The recommended code snippet is similar to the following:

df = df.join(country_code_sum, "CountryCode")
df = df.withColumn("AveragePrizePerPlayerPerCountry", df["SumPrizePerCountry"] / df["count"])
  1. The last part in the transformation phase is to sort the data by the highest average prize per player per country:
    # sort the above dataframe descendingly according to the highest Average Prize per player country
    # Show the top 5 rows

The recommended code snippet is similar to the following:

df = df.sort(df["AveragePrizePerPlayerPerCountry"].desc())
df.show(5)

The first five rows will be similar to the following.

For the last step, we write the DynamicFrame to Amazon S3 and to Amazon Redshift.

  1. Write the DynamicFrame to Amazon S3 with the following code:
    # Convert the data frame to DynamicFrame
    # Write the DynamicFrame to S3 in glueparquet format

The CodeWhisperer recommendation is similar to the following code snippet:

dyF = DynamicFrame.fromDF(df, glueContext, "dyF")

glueContext.write_dynamic_frame.from_options(
frame=dyF,
connection_type="s3",
connection_options={
"path": "s3://<enter your own>/",
"partitionKeys": [],
},
format = "glueparquet",
)

We need to correct the code snippet generated after the recommendation because it doesn’t contain partition keys. As we pointed out, partitionkeys is empty, so we can have another code block suggestion to set partitionkey and then write it to the target Amazon S3 location. Also, according to the newest updates related to writing DynamicFrames to Amazon S3 using glueparquet, format = "glueparquet" is no longer used. Instead, you need to use the parquet type with useGlueParquetWriter enabled.

After the updates, our code looks similar to the following:

dyF = DynamicFrame.fromDF(df, glueContext, "dyF")

glueContext.write_dynamic_frame.from_options(
frame=dyF,
connection_type="s3",
connection_options={
"path": "s3://<enter your own>/",
},
format = "parquet",
 format_options={
 "useGlueParquetWriter": True,
    },
)

Another option here would be to write the files to Amazon Redshift using a JDBC connection.

  1. First, enter the following command to check whether CodeWhisperer will understand the comment in one sentence and use the correct functions or not:
    # Write the DynamicFrame to Redshift
    # Select only the following 3 columns in the DynamicFrame to write to redshift: CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountr

The output of the comment is similar to the following code snippet:

dyF = dyF.select_fields(["CountryCode", "TotalUSDPrize", "NameFirst","NameLast","AveragePrizePerPlayerPerCountry"])
glueContext.write_dynamic_frame.from_options(
    frame=dyF,
    connection_type="redshift",
    connection_options={
        "url": "jdbc:redshift://omarglueblog.cjzxbkzmhwqm.us-east-1.redshift.amazonaws.com:5439/omarglueblog",
        "user": "omarglueblog",
        "password": "omarglueblog",
        "dbtable": "highest_earning_players_redshift",
        "driver": "com.amazon.redshift.jdbc42.Driver",
        "preactions": "truncate table highest_earning_players_redshift",
        "redshiftTmpDir": "s3://awsgluetest/tmp/",
        "aws_iam_role": "0000000000000000000000000:role/glue-role",
    },
     format = "redshift",
     transformation_ctx="dyF"
    
)

As we can see, CodeWhisperer correctly interpreted the comment by selecting only the specified columns to write to Amazon Redshift.

  1. Now, use CodeWhisperer to write the DynamicFrame to Amazon Redshift. We use the Preaction parameter to run a SQL query to select only certain columns to be written to Amazon Redshift:
    # Write the resultant DynamicFrame to Redshift 
    # using preaction that selects only the following columns: CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountry

The CodeWhisperer recommendation is similar to the following code snippet:

glueContext.write_dynamic_frame.from_options(
    frame = dyf,
    connection_type = "redshift",
    connection_options = {
        "url": "jdbc:redshift://awsgluetest.cjw8y5zdqmhz.us-east-1.redshift.amazonaws.com:5439/dev",
        "user": "awsuser",
        "password": "awsuser",
        "dbtable": "players",
        "preactions": "SELECT CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountry FROM highest_earning_player",
        "redshiftTmpDir": "s3://awsgluetest/tmp/"
        },
    format = "glueparquet",
    transformation_ctx = "write_dynamic_frame")

After checking the preceding code snippet, you can observe that there is a misplaced format, which you can remove. You can also add the iam_role as an input in connection_options. You can also notice that CodeWhisperer has automatically assumed the Redshift URL to have the same name as the S3 folder that we used. Therefore, you need to change the URL and the S3 temp directory bucket to reflect your own parameters and remove the password parameter. The final code snippet should be similar to the following:

glueContext.write_dynamic_frame.from_options(
frame=dyF,
connection_type="redshift",
connection_options={
"url": "jdbc:redshift://<enter your own>.cjwjn5pzxbhx.us-east-1.redshift.amazonaws.com:5439/<enter your own>",
"user": "<enter your own>",
"dbtable": "<enter your own>",
"driver": "com.amazon.redshift.jdbc42.Driver",
"preactions": "SELECT CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountry FROM <enter your table>",
"redshiftTmpDir": "<enter your own>",
"aws_iam_role": "<enter your own>",
}
)

The following is the whole code and comment snippets:

%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 3

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import current_timestamp
from awsglue.DynamicFrame import DynamicFrame


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


# Read CSV files from S3
dyF = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://<bucket>/<path>/highest_earning_players.csv"],
        "recurse": True,
    },
    format="csv",
    format_options={
        "withHeader": True,
    },
    transformation_ctx="dyF")
    
# Print the schema of the above DynamicFrame
dyF.printSchema()


# Convert the DynamicFrame to spark data frame
# Cast the 'PlayerId' column from string to Integer using WithColumn function
# Convert the spark frame back to DynamicFrame and print the schema
df = dyF.toDF()
df = df.withColumn("PlayerId", df["PlayerId"].cast("integer"))
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")
dyF.printSchema()


# Convert the dynamicFrame to Spark Frame
# Cast the "TotalUSDPrize" column from String to long
# Convert the spark frame back to dynamic frame and print the schema
df = dyF.toDF()
df = df.withColumn("TotalUSDPrize", df["TotalUSDPrize"].cast("long"))
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")
dyF.printSchema()

# Get the count of each country code
country_code_count = df.groupBy("CountryCode").count()
country_code_count.show()

# Convert the DynamicFrame (dyF) to dataframe (df)
# Join the dataframe (df) with country_code_count dataframe with respect to CountryCode column
# Convert the spark frame back to DynamicFrame and print the schema
df = dyF.toDF()
df = df.join(country_code_count, "CountryCode")
df.printSchema()

# Get the sum of all the TotalUSDPrize column per countrycode
# Rename the sum column to be "SumPrizePerCountry"
country_code_sum = df.groupBy("CountryCode").sum("TotalUSDPrize")
country_code_sum = country_code_sum.withColumnRenamed("sum(TotalUSDPrize)", "SumPrizePerCountry")
country_code_sum.show()

# Join the above dataframe with the main dataframe with respect to CountryCode
# Get the average Total prize in USD per player per country and add it to a new column called "AveragePrizePerPlayerPerCountry"
df.join(country_code_sum, "CountryCode")
df = df.withColumn("AveragePrizePerPlayerPerCountry", df["SumPrizePerCountry"] / df["count"])

# sort the above dataframe descendingly according to the highest Average Prize per player country
# Show the top 5 rows
df = df.sort(df["AveragePrizePerPlayerPerCountry"].desc())
df.show(5)

# Convert the data frame to DynamicFrame
# Write the DynamicFrame to S3 in glueparquet format
dyF = DynamicFrame.fromDF(df, glueContext, "dyF")

glueContext.write_dynamic_frame.from_options(
frame=dyF,
connection_type="s3",
connection_options={
"path": "s3://<enter your own>/",
},
format = "parquet",
 format_options={
 "useGlueParquetWriter": True,
    },
)

# Write the resultant DynamicFrame to Redshift 
# using preaction that selects only the following columns: CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountry
glueContext.write_dynamic_frame.from_options(
frame=dyF,
connection_type="redshift",
connection_options={
"url": "jdbc:redshift://<enter your own>.cjwjn5pzxbhx.us-east-1.redshift.amazonaws.com:5439/<enter your own>",
"user": "<enter your own>",
"dbtable": "<enter your own>",
"driver": "com.amazon.redshift.jdbc42.Driver",
"preactions": "SELECT CountryCode, TotalUSDPrize, NameFirst, NameLast, AveragePrizePerPlayerPerCountry FROM <enter your table>",
"redshiftTmpDir": "<enter your own>",
"aws_iam_role": "<enter your own>",
}
)

Conclusion

In this post, we demonstrated a real-world use case on how AWS Glue Studio notebook integration with CodeWhisperer helps you build data integration jobs faster. You can start using the AWS Glue Studio notebook with CodeWhisperer to accelerate building your data integration jobs.

To learn more about using AWS Glue Studio notebooks and CodeWhisperer, check out the following video.


About the authors

Ishan Gaur works as Sr. Big Data Cloud Engineer ( ETL ) specialized in AWS Glue. He’s passionate about helping customers building out scalable distributed ETL workloads and analytics pipelines on AWS.

Omar Elkharbotly is a Glue SME who works as Big Data Cloud Support Engineer 2 (DIST). He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS.

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka Quotas – Part 1

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-1/

With Amazon Managed Streaming for Apache Kafka (Amazon MSK), you can build and run applications that use Apache Kafka to process streaming data. To process streaming data, organizations either use multiple Kafka clusters based on their application groupings, usage scenarios, compliance requirements, and other factors, or a dedicated Kafka cluster for the entire organization. It doesn’t matter what pattern is used, Kafka clusters are typically multi-tenant, allowing multiple producer and consumer applications to consume and produce streaming data simultaneously.

With multi-tenant Kafka clusters, however, one of the challenges is to make sure that data consumer and producer applications don’t overuse cluster resources. There is a possibility that a few poorly behaved applications may overuse cluster resources, affecting the well-behaved applications as a result. Therefore, teams who manage multi-tenant Kafka clusters need a mechanism to prevent applications from overconsuming cluster resources in order to avoid issues. This is where Kafka quotas come into play. Kafka quotas control the amount of resources client applications can use within a Kafka cluster.

In Part 1 of this two-part series, we explain the concepts of how to enforce Kafka quotas in MSK multi-tenant Kafka clusters while using AWS Identity and Access Management (IAM) access control for authentication and authorization. In Part 2, we cover detailed implementation steps along with sample Kafka client applications.

Brief introduction to Kafka quotas

Kafka quotas control the amount of resources client applications can use within a Kafka cluster. It’s possible for the multi-tenant Kafka cluster to experience performance degradation or a complete outage due to resource constraints if one or more client applications produce or consume large volumes of data or generate requests at a very high rate for a continuous period of time, monopolizing Kafka cluster’s resources.

To prevent applications from overwhelming the cluster, Apache Kafka allows configuring quotas that determine how much traffic each client application produces and consumes per Kafka broker in a cluster. Kafka brokers throttle the client applications’ requests in accordance with their allocated quotas. Kafka quotas can be configured for specific users, or specific client IDs, or both. The client ID is a logical name defined in the application code that Kafka brokers use to identify which application sent messages. The user represents the authenticated user principal of a client application in a secure Kafka cluster with authentication enabled.

There are two types of quotas supported in Kafka:

  • Network bandwidth quotas – The byte-rate thresholds define how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second.
  • Request rate quotas – This limits the percentage of time each individual broker spends processing client applications requests.

Depending on the business requirements, you can use either of these quota configurations. However, the use of network bandwidth quotas is common because it allows organizations to cap platform resources consumption according to the amount of data produced and consumed by applications per second.

Because this post uses an MSK cluster with IAM access control, we specifically discuss configuring network bandwidth quotas based on the applications’ client IDs and authenticated user principals.

Considerations for Kafka quotas

Keep the following in mind when working with Kafka quotas:

  • Enforcement level – Quotas are enforced at the broker level rather than at the cluster level. Suppose there are six brokers in a Kafka cluster and you specify a 12 MB/sec produce quota for a client ID and user. The producer application using the client ID and user can produce a max of 12MB/sec on each broker at the same time, for a total of max 72 MB/sec across all six brokers. However, if leadership for every partition of a topic resides on one broker, the same producer application can only produce a max of 12 MB/sec. Due to the fact that throttling occurs per broker, it’s essential to maintain an even balance of topics’ partitions leadership across all the brokers.
  • Throttling – When an application reaches its quota, it is throttled, not failed, meaning the broker doesn’t throw an exception. Clients who reach their quota on a broker will begin to have their requests throttled by the broker to prevent exceeding the quota. Instead of sending an error when a client exceeds a quota, the broker attempts to slow it down. Brokers calculate the amount of delay necessary to bring clients under quotas and delay responses accordingly. As a result of this approach, quota violations are transparent to clients, and clients don’t have to implement any special backoff or retry policies. However, when using an asynchronous producer and sending messages at a rate greater than the broker can accept due to quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay isn’t sufficient to allow the broker to catch up to the producer application.
  • Shared quotas – If more than one client application has the same client ID and user, the quota configured for the client ID and user will be shared among all those applications. Suppose you configure a produce quota of 5 MB/sec for the combination of client-id="marketing-producer-client" and user="marketing-app-user". In this case, all producer applications that have marketing-producer-client as a client ID and marketing-app-user as an authenticated user principal will share the 5 MB/sec produce quota, impacting each other’s throughput.
  • Produce throttling – The produce throttling behavior is exposed to producer clients via client metrics such as produce-throttle-time-avg and produce-throttle-time-max. If these are non-zero, it indicates that the destination brokers are slowing the producer down and the quotas configuration should be reviewed.
  • Consume throttling – The consume throttling behavior is exposed to consumer clients via client metrics such as fetch-throttle-time-avg and fetch-throttle-time-max. If these are non-zero, it indicates that the origin brokers are slowing the consumer down and the quotas configuration should be reviewed.

Note that client metrics are metrics exposed by clients connecting to Kafka clusters.

  • Quota configuration – It’s possible to configure Kafka quotas either statically through the Kafka configuration file or dynamically through kafka-config.sh or the Kafka Admin API. The dynamic configuration mechanism is much more convenient and manageable because it allows quotas for the new producer and consumer applications to be configured at any time without having to restart brokers. Even while application clients are producing or consuming data, dynamic configuration changes take effect in real time.
  • Configuration keys – With the kafka-config.sh command-line tool, you can set dynamic consume, produce, and request quotas using the following three configuration keys, respectively: consumer_byte_rate, producer_byte_rate, and request_percentage.

For more information about Kafka quotas, refer to Kafka documentation.

Enforce network bandwidth quotas with IAM access control

Following our understanding of Kafka quotas, let’s look at how to enforce them in an MSK cluster while using IAM access control for authentication and authorization. IAM access control in Amazon MSK eliminates the need for two separate mechanisms for authentication and authorization.

The following figure shows an MSK cluster that is configured to use IAM access control in the demo account. Each producer and consumer application has a quota that determines how much data they can produce or consume in bytes per second. For example, ProducerApp-1 has a produce quota of 1024 bytes/sec, and ConsumerApp-1 and ConsumerApp-2 each have a consume quota of 5120 and 1024 bytes/sec, respectively. It’s important to note that Kafka quotas are set on the Kafka cluster rather than in the client applications.

The preceding figure illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. The workflow is as follows:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B in the MSK cluster.
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B.
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B in the MSK cluster.
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B.

ConsumerApp-1 and ConsumerApp-2 are two separate consumer applications. They do not belong to the same consumer group.

Configuring client IDs and understanding authenticated user principal

As explained earlier, Kafka quotas can be configured for specific users, specific client IDs, or both. Let’s explore client ID and user concepts and configurations required for Kafka quota allocation.

Client ID

A client ID representing an application’s logical name can be configured within an application’s code. In Java applications, for example, you can set the producer’s and consumer’s client IDs using ProducerConfig.CLIENT_ID_CONFIG and ConsumerConfig.CLIENT_ID_CONFIG configurations, respectively. The following code snippet illustrates how ProducerApp-1 sets the client ID to this-is-me-producerapp-1 using ProducerConfig.CLIENT_ID_CONFIG:

Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG,"this-is-me-producerapp-1");

User

The user refers to an authenticated user principal of the client application in the Kafka cluster with authentication enabled. As shown in the solution architecture, producer and consumer applications assume the Topic-B-Write-Role and Topic-B-Read-Role IAM roles, respectively, to perform write and read operations on Topic-B. Therefore, their authenticated user principal will look like the following IAM identifier:

arn:aws:sts::<AWS Account Id>:assumed-role/<assumed Role Name>/<role session name>

For more information, refer to IAM identifiers.

The role session name is a string identifier that uniquely identifies a session when IAM principals, federated identities, or applications assume an IAM role. In our case, ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications assume an IAM role using the AWS Security Token Service (AWS STS) SDK, and provide a role session name in the AWS STS SDK call. For example, if ProducerApp-1 assumes the Topic-B-Write-Role IAM role and uses this-is-producerapp-1-role-session as its role session name, its authenticated user principal will be as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session

The following is an example code snippet from the ProducerApp-1 application using this-is-producerapp-1-role-session as the role session name while assuming the Topic-B-Write-Role IAM role using the AWS STS SDK:

StsClient stsClient = StsClient.builder().region(region).build();
AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
          .roleArn("<Topic-B-Write-Role ARN>")
          .roleSessionName("this-is-producerapp-1-role-session") //role-session-name string literal
          .build();

Configure network bandwidth (produce and consume) quotas

The following commands configure the produce and consume quotas dynamically for client applications based on their client ID and authenticated user principal in the MSK cluster configured with IAM access control.

The following code configures the produce quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ProducerApp client Id> \
--entity-type users --entity-name <ProducerApp user principal>

The producer_byes_rate refers to the number of messages, in bytes, that a producer client identified by client ID and user is allowed to produce to a single broker per second. The option --command-config points to config_iam.properties, which contains the properties required for IAM access control.

The following code configures the consume quota:

kafka-configs.sh --bootstrap-server <MSK cluster bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=<number of bytes per second>" \
--entity-type clients --entity-name <ConsumerApp client Id> \
--entity-type users --entity-name <ConsumerApp user principal>

The consumer_bytes_rate refers to the number of messages, in bytes, that a consumer client identified by client ID and user allowed to consume from a single broker per second.

Let’s look at some example quota configuration commands for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications:

  • ProducerApp-1 produce quota configuration – Let’s assume ProducerApp-1 has this-is-me-producerapp-1 configured as the client ID in the application code and uses this-is-producerapp-1-role-session as the role session name when assuming the Topic-B-Write-Role IAM role. The following command sets the produce quota for ProducerApp-1 to 1024 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "producer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-producerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Write-Role/this-is-producerapp-1-role-session
  • ConsumerApp-1 consume quota configuration – Let’s assume ConsumerApp-1 has this-is-me-consumerapp-1 configured as the client ID in the application code and uses this-is-consumerapp-1-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-1 to 5120 bytes per second:
kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=5120" \
--entity-type clients --entity-name this-is-me-consumerapp-1 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-1-role-session


ConsumerApp-2 consume quota configuration
– Let’s assume ConsumerApp-2 has this-is-me-consumerapp-2 configured as the client ID in the application code and uses this-is-consumerapp-2-role-session as the role session name when assuming the Topic-B-Read-Role IAM role. The following command sets the consume quota for ConsumerApp-2 to 1024 bytes per second per broker:

kafka-configs.sh --bootstrap-server <MSK Cluster Bootstrap servers IAM endpoint> \
--command-config config_iam.properties \
--alter --add-config "consumer_byte_rate=1024" \
--entity-type clients --entity-name this-is-me-consumerapp-2 \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/this-is-consumerapp-2-role-session

As a result of the preceding commands, the ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 client applications will be throttled by the MSK cluster using IAM access control if they exceed their assigned produce and consume quotas, respectively.

Implement the solution

Part 2 of this series showcases the step-by-step detailed implementation of Kafka quotas configuration with IAM access control along with the sample producer and consumer client applications.

Conclusion

Kafka quotas offer teams the ability to set limits for producer and consumer applications. With Amazon MSK, Kafka quotas serve two important purposes: eliminating guesswork and preventing issues caused by poorly designed producer or consumer applications by limiting their quota, and allocating operational costs of a central streaming data platform across different cost centers and tenants (application and product teams).

In this post, we learned how to configure network bandwidth quotas within Amazon MSK while using IAM access control. We also covered some sample commands and code snippets to clarify how the client ID and authenticated principal are used when configuring quotas. Although we only demonstrated Kafka quotas using IAM access control, you can also configure them using other Amazon MSK-supported authentication mechanisms.

In Part 2 of this series, we demonstrate how to configure network bandwidth quotas with IAM access control in Amazon MSK and provide you with example producer and consumer applications so that you can see them in action.

Check out the following resources to learn more:


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. Having worked with financial services organizations and digital native customers, he advises financial services customers in Australia on technology decisions, architectures, and product roadmaps.

Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka quotas – Part 2

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-2/

Kafka quotas are integral to multi-tenant Kafka clusters. They prevent Kafka cluster performance from being negatively affected by poorly behaved applications overconsuming cluster resources. Furthermore, they enable the central streaming data platform to be operated as a multi-tenant platform and used by downstream and upstream applications across multiple business lines. Kafka supports two types of quotas: network bandwidth quotas and request rate quotas. Network bandwidth quotas define byte-rate thresholds such as how much data client applications can produce to and consume from each individual broker in a Kafka cluster measured in bytes per second. Request rate quotas limit the percentage of time each individual broker spends processing client applications requests. Depending on your configuration, Kafka quotas can be set for specific users, specific client IDs, or both.

In Part 1 of this two-part series, we discussed the concepts of how to enforce Kafka quotas in Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters while using AWS Identity and Access Management (IAM) access control.

In this post, we walk you through the step-by-step implementation of setting up Kafka quotas in an MSK cluster while using IAM access control and testing them through sample client applications.

Solution overview

The following figure, which we first introduced in Part 1, illustrates how Kafka client applications (ProducerApp-1, ConsumerApp-1, and ConsumerApp-2) access Topic-B in the MSK cluster by assuming write and read IAM roles. Each producer and consumer client application has a quota that determines how much data they can produce or consume in bytes/second. The ProducerApp-1 quota allows it to produce up to 1024 bytes/second per broker. Similarly, the ConsumerApp-1 and ConsumerApp-2 quotas allow them to consume 5120 and 1024 bytes/second per broker, respectively. The following is a brief explanation of the flow shown in the architecture diagram:

  • P1ProducerApp-1 (via its ProducerApp-1-Role IAM role) assumes the Topic-B-Write-Role IAM role to send messages to Topic-B
  • P2 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1ConsumerApp-1 (via its ConsumerApp-1-Role IAM role) and ConsumerApp-2 (via its ConsumerApp-2-Role IAM role) assume the Topic-B-Read-Role IAM role to read messages from Topic-B
  • C2 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Note that this post uses the AWS Command Line Interface (AWS CLI), AWS CloudFormation templates, and the AWS Management Console for provisioning and modifying AWS resources, and resources provisioned will be billed to your AWS account.

The high-level steps are as follows:

  1. Provision an MSK cluster with IAM access control and Amazon Elastic Compute Cloud (Amazon EC2) instances for client applications.
  2. Create Topic-B on the MSK cluster.
  3. Create IAM roles for the client applications to access Topic-B.
  4. Run the producer and consumer applications without setting quotas.
  5. Configure the produce and consume quotas for the client applications.
  6. Rerun the applications after setting the quotas.

Prerequisites

It is recommended that you read Part 1 of this series before continuing. In order to get started, you need the following:

  • An AWS account that will be referred to as the demo account in this post, assuming that its account ID is 1111 1111 1111
  • Permissions to create, delete, and modify AWS resources in the demo account

Provision an MSK cluster with IAM access control and EC2 instances

This step involves provisioning an MSK cluster with IAM access control in a VPC in the demo account. Additionally, we create four EC2 instances to make configuration changes to the MSK cluster and host producer and consumer client applications.

Deploy CloudFormation stack

  1. Clone the GitHub repository to download the CloudFormation template files and sample client applications:
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git
  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose Create stack.
  3. For Prepare template, select Template is ready.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-1.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. For Stack name, enter MSKStack.
  7. Leave the parameters as default and choose Next.
  8. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  9. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Submit.

It will take approximately 30 minutes for the stack to complete. After the stack has been successfully created, the following resources will be created:

  • A VPC with three private subnets and one public subnet
  • An MSK cluster with three brokers with IAM access control enabled
  • An EC2 instance called MSKAdminInstance for modifying MSK cluster settings as well as creating and modifying AWS resources
  • EC2 instances for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2, one for each client application
  • A separate IAM role for each EC2 instance that hosts the client application, as shown in the architecture diagram
  1. From the stack’s Outputs tab, note the MSKClusterArn value.

Create a topic on the MSK cluster

To create Topic-B on the MSK cluster, complete the following steps:

  1. On the Amazon EC2 console, navigate to your list of running EC2 instances.
  2. Select the MSKAdminInstance EC2 instance and choose Connect.
  3. On the Session Manager tab, choose Connect.
  4. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Add Kafka binaries to the path
sed -i 's|HOME/bin|HOME/bin:~/kafka/bin|' .bash_profile

# Set your AWS region
aws configure set region <AWS Region>
  1. Set the environment variable to point to the MSK Cluster brokers IAM endpoint:
MSK_CLUSTER_ARN=<Use the value of MSKClusterArn that you noted earlier>
echo "export BOOTSTRAP_BROKERS_IAM=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerStringSaslIam)" >> .bash_profile
source .bash_profile
echo $BOOTSTRAP_BROKERS_IAM
  1. Take note of the value of BOOTSTRAP_BROKERS_IAM.
  2. Run the following Kafka CLI command to create Topic-B on the MSK cluster:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--create --topic Topic-B \
--partitions 3 --replication-factor 3 \
--command-config config_iam.properties

Because the MSK cluster is provisioned with IAM access control, the option --command-config points to config_iam.properties, which contains the properties required for IAM access control, created by the MSKStack CloudFormation stack.

The following warnings may appear when you run the Kafka CLI commands, but you may ignore them:

The configuration 'sasl.jaas.config' was supplied but isn't a known config. 
The configuration 'sasl.client.callback.handler.class' was supplied but isn't a known config.
  1. To verify that Topic-B has been created, list all the topics:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties --list

Create IAM roles for client applications to access Topic-B

This step involves creating Topic-B-Write-Role and Topic-B-Read-Role as shown in the architecture diagram. Topic-B-Write-Role enables write operations on Topic-B, and can be assumed by the ProducerApp-1 . In a similar way, the ConsumerApp-1 and ConsumerApp-2 can assume Topic-B-Read-Role to perform read operations on Topic-B. To perform read operations on Topic-B, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups specified during the MSKStack stack update in the subsequent step.

Create the roles with the following steps:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select MSKStack and choose Update.
  3. For Prepare template, select Replace current template.
  4. For Template source, select Upload a template file.
  5. Upload the cfn-msk-stack-2.yaml file from amazon-msk-kafka-quotas/cfn-templates directory, then choose Next.
  6. Provide the following additional stack parameters:
    • For Topic B ARN, enter the Topic-B ARN.

The ARN must be formatted as arn:aws:kafka:region:account-id:topic/msk-cluster-name/msk-cluster-uuid/Topic-B. Use the cluster name and cluster UUID from the MSK cluster ARN you noted earlier and provide your AWS Region. For more information, refer to the IAM access control for Amazon MSK.

    • For ConsumerApp-1 Consumer Group name, enter ConsumerApp-1 consumer group ARN.

It must be formatted as arn:aws:kafka:region:account-id:group/msk-cluster-name/msk-cluster-uuid/consumer-group-name

    • For ConsumerApp-2 Consumer Group name, enter ConsumerApp-2 consumer group ARN.

Use a similar format as the previous ARN.

  1. Choose Next to continue.
  2. Scroll to the bottom of the Configure stack options page and choose Next to continue.
  3. Scroll to the bottom of the Review page, select the check box I acknowledge that CloudFormation may create IAM resources, and choose Update stack.

It will take approximately 3 minutes for the stack to update. After the stack has been successfully updated, the following resources will be created:

  • Topic-B-Write-Role – An IAM role with permission to perform write operations on Topic-B. Its trust policy allows the ProducerApp-1-Role IAM role to assume it.
  • Topic-B-Read-Role – An IAM role with permission to perform read operations on Topic-B. Its trust policy allows the ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles to assume it. Furthermore, ConsumerApp-1 and ConsumerApp-2 must also belong to the consumer groups you specified when updating the stack to perform read operations on Topic-B.
  1. From the stack’s Outputs tab, note the TopicBReadRoleARN and TopicBWriteRoleARN values.

Run the producer and consumer applications without setting quotas

Here, we run ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 without setting their quotas. From the previous steps, you will need BOOTSTRAP_BROKERS_IAM value, Topic-B-Write-Role ARN, and Topic-B-Read-Role ARN. The source code of client applications and their packaged versions are available in the GitHub repository.

Run the ConsumerApp-1 application

To run the ConsumerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-1 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-1 consumer group name> \
--role-session-name <role session name for ConsumerApp-1 to use during STS assume role call> \
--client-id <ConsumerApp-1 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Read-Role IAM role ARN. Assuming this role, ConsumerApp-1 will read messages from the topic.
  • –region – Region you’re using.
  • –topic-name – Topic name from which ConsumerApp-1 will read messages. The default is Topic-B.
  • –consumer-group – Consumer group name for ConsumerApp-1, as specified during the stack update.
  • –role-session-name ConsumerApp-1 assumes the Topic-B-Read-Role using the AWS Security Token Service (AWS STS) SDK. ConsumerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-1 .
  • –print-consumer-quota-metrics – Flag indicating whether client metrics should be printed on the terminal by ConsumerApp-1.
  • –cw-dimension-nameAmazon CloudWatch dimension name that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ConsumerApp-1.
  • –cw-namespace – Namespace where ConsumerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-1-cg \
--role-session-name consumerapp-1-role-session \
--client-id consumerapp-1-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-1 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-1. Remember that we haven’t set the consume quota for ConsumerApp-1 yet. Let it run for a while.

Run the ConsumerApp-2 application

To run the ConsumerApp-2 application, complete the following steps:

  1. On the Amazon EC2 console, select the ConsumerApp-2 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ConsumerApp-2 application to start consuming messages from Topic-B:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-2 consumer group name> \
--role-session-name <role session name for ConsumerApp-2 to use during STS assume role call> \
--client-id <ConsumerApp-2 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

The code has similar command line parameters details as ConsumerApp-1 discussed previously, except for the following:

  • –consumer-group – Consumer group name for ConsumerApp-2, as specified during the stack update.
  • –role-session-name ConsumerApp-2 assumes the Topic-B-Read-Role using the AWS STS SDK. ConsumerApp-2 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID for ConsumerApp-2 .
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-2-cg \
--role-session-name consumerapp-2-role-session \
--client-id consumerapp-2-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-2 \
--cw-namespace ConsumerApps

The fetch-throttle-time-avg and fetch-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ConsumerApp-2. Remember that we haven’t set the consume quota for ConsumerApp-2 yet. Let it run for a while.

Run the ProducerApp-1 application

To run the ProducerApp-1 application, complete the following steps:

  1. On the Amazon EC2 console, select the ProducerApp-1 EC2 instance and choose Connect.
  2. On the Session Manager tab, choose Connect.
  3. Run the following commands on the new tab that opens in your browser:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. Run the ProducerApp-1 application to start sending messages to Topic-B:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Write-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--num-messages <Number of events> \
--role-session-name <role session name for ProducerApp-1 to use during STS assume role call> \
--client-id <ProducerApp-1 client.id> \
--producer-type <Producer Type, options are sync or async> \
--print-producer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

You can find the source code on GitHub for your reference. The command line parameter details are as follows:

  • –bootstrap-servers – MSK cluster bootstrap brokers IAM endpoint.
  • –assume-role-arnTopic-B-Write-Role IAM role ARN. Assuming this role, ProducerApp-1 will write messages to the topic.
  • –topic-nameProducerApp-1 will send messages to this topic. The default is Topic-B.
  • –region – AWS Region you’re using.
  • –num-messages – Number of messages the ProducerApp-1 application will send to the topic.
  • –role-session-name ProducerApp-1 assumes the Topic-B-Write-Role using the AWS STS SDK. ProducerApp-1 will use this role session name when calling the assumeRole function.
  • –client-id – Client ID of ProducerApp-1 .
  • –producer-typeProducerApp-1can be run either synchronously or asynchronously. Options are sync or async.
  • –print-producer-quota-metrics – Flag indicating whether the client metrics should be printed on the terminal by ProducerApp-1.
  • –cw-dimension-name – CloudWatch dimension name that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-dimension-value – CloudWatch dimension value that will be used to publish client throttling metrics from ProducerApp-1.
  • –cw-namespace – The namespace where ProducerApp-1 will publish CloudWatch metrics in order to monitor throttling.
  1. If you’re satisfied with the rest of parameters, use the following command and change --assume-role-arn and --region as per your environment. To run a synchronous Kafka producer, it uses the option --producer-type sync:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBWriteRole-xxxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--num-messages 10000000 \
--role-session-name producerapp-1-role-session \
--client-id producerapp-1-client-id \
--producer-type sync \
--print-producer-quota-metrics Y \
--cw-dimension-name ProducerApp \
--cw-dimension-value ProducerApp-1 \
--cw-namespace ProducerApps

Alternatively, use --producer-type async to run an asynchronous producer. For more details, refer to Asynchronous send.

The produce-throttle-time-avg and produce-throttle-time-max client metrics should display 0.0, indicating no throttling is occurring for ProducerApp-1. Remember that we haven’t set the produce quota for ProducerApp-1 yet. Check that ConsumerApp-1 and ConsumerApp-2 can consume messages and notice they are not throttled. Stop the consumer and producer client applications by pressing Ctrl+C in their respective browser tabs.

Set produce and consume quotas for client applications

Now that we have run the producer and consumer applications without quotas, we set their quotas and rerun them.

Open the Sessions Manager terminal for the MSKAdminInstance EC2 instance as described earlier and run the following commands to find the default configuration of one of the brokers in the MSK cluster. MSK clusters are provisioned with the default Kafka quotas configuration.

# Describe Broker-1 default configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--entity-type brokers \
--entity-name 1 \
--all --describe > broker1_default_configurations.txt
cat broker1_default_configurations.txt | grep quota.consumer.default
cat broker1_default_configurations.txt | grep quota.producer.default

The following screenshot shows the Broker-1 default values for quota.consumer.default and quota.producer.default.

ProducerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

According to the architecture diagram discussed earlier, set the ProducerApp-1 produce quota to 1024 bytes/second. For <ProducerApp-1 Client Id> and <ProducerApp-1 Role Session>, make sure you use the same values that you used while running ProducerApp-1 earlier (producerapp-1-client-id and producerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'producer_byte_rate=1024' \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

Verify the ProducerApp-1 produce quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

You can remove the ProducerApp-1 produce quota by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config producer_byte_rate \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

ConsumerApp-1 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 5120 bytes/second for ConsumerApp-1. For <ConsumerApp-1 Client Id> and <ConsumerApp-1 Role Session>, make sure you use the same values that you used while running ConsumerApp-1 earlier (consumerapp-1-client-id and consumerapp-1-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=5120' \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

Verify the ConsumerApp-1 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

You can remove the ConsumerApp-1 consume quota, by using the following command, but don’t run the command as we’ll test the quotas next.

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config consumer_byte_rate \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

ConsumerApp-2 quota configuration

Replace placeholders in all the commands in this section with values that correspond to your account.

Let’s set a consume quota of 1024 bytes/second for ConsumerApp-2. For <ConsumerApp-2 Client Id> and <ConsumerApp-2 Role Session>, make sure you use the same values that you used while running ConsumerApp-2 earlier (consumerapp-2-client-id and consumerapp-2-role-session, respectively):

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=1024' \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

Verify the ConsumerApp-2 consume quota using the following command:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

As with ConsumerApp-1, you can remove the ConsumerApp-2 consume quota using the same command with ConsumerApp-2 client and user details.

Rerun the producer and consumer applications after setting quotas

Let’s rerun the applications to verify the effect of the quotas.

Rerun ProducerApp-1

Rerun ProducerApp-1 in synchronous mode with the same command that you used earlier. The following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow ProducerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

You can also test the effect of the produce quota by rerunning ProducerApp-1 again in asynchronous mode (--producer-type async). Similar to a synchronous run, the following screenshot illustrates that when ProducerApp-1 reaches its quota on any of the brokers, the produce-throttle-time-avg and produce-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ProducerApp-1 is throttled. Allow asynchronous ProducerApp-1 to run for a while.

You will eventually see a TimeoutException stating org.apache.kafka.common.errors.TimeoutException: Expiring xxxxx record(s) for Topic-B-2:xxxxxxx ms has passed since batch creation

When using an asynchronous producer and sending messages at a rate greater than the broker can accept due to the quota, the messages will be queued in the client application memory first. The client will eventually run out of buffer space if the rate of sending messages continues to exceed the rate of accepting messages, causing the next Producer.send() call to be blocked. Producer.send() will eventually throw a TimeoutException if the timeout delay is not sufficient to allow the broker to catch up to the producer application. Stop ProducerApp-1 by using Ctrl+C.

Rerun ConsumerApp-1

Rerun ConsumerApp-1 with the same command that you used earlier. The following screenshot illustrates that when ConsumerApp-1 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-1 is throttled.

Allow ConsumerApp-1 to run for a few seconds and then stop it by using Ctrl+C.

Rerun ConsumerApp-2

Rerun ConsumerApp-2 with the same command that you used earlier. Similarly, when ConsumerApp-2 reaches its quota, the fetch-throttle-time-avg and fetch-throttle-time-max client metrics value will be above 0.0. A value above 0.0 indicates that ConsumerApp-2 is throttled. Allow ConsumerApp-2 to run for a few seconds and then stop it by pressing Ctrl+C.

Client quota metrics in Amazon CloudWatch

In Part 1, we explained that client metrics are metrics exposed by clients connecting to Kafka clusters. Let’s examine the client metrics in CloudWatch.

  1. On the CloudWatch console, choose All metrics.
  2. Under Custom Namespaces, choose the namespace you provided while running the client applications.
  3. Choose the dimension name and select produce-throttle-time-max, produce-throttle-time-avg, fetch-throttle-time-max, and fetch-throttle-time-avg metrics for all the applications.

These metrics indicate throttling behavior for ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications tested with the quota configurations in the previous section. The following screenshots indicate the throttling of ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 based on network bandwidth quotas. ProducerApp-1, ConsumerApp-1, and ConsumerApp-2 applications feed their respective client metrics to CloudWatch. You can find the source code on GitHub for your reference.

Secure client ID and role session name

We discussed how to configure Kafka quotas using an application’s client ID and authenticated user principal. When a client application assumes an IAM role to access Kafka topics on a MSK cluster with IAM authentication enabled, its authenticated user principal is represented in the following format (for more information, refer to IAM identifiers):

arn:aws:sts::111111111111:assumed-role/Topic-B-Write-Role/producerapp-1-role-session

It contains the role session name (in this case, producerapp-1-role-session) used in the client application while assuming an IAM role through the AWS STS SDK. The client application source code is available for your reference. The client ID is a logical name string (for example, producerapp-1-client-id) that is configured in the application code by the application team. Therefore, an application can impersonate another application if it obtains the client ID and role session name of the other application, and if it has permission to assume the same IAM role.

As shown in the architecture diagram, ConsumerApp-1 and ConsumerApp-2 are two separate client applications with their respective quota allocations. Because both have permission to assume the same IAM role (Topic-B-Read-Role) in the demo account, they are allowed to consume messages from Topic-B. Thus, MSK cluster brokers distinguish them based on their client IDs and users (which contain their respective role session name values). If ConsumerApp-2 somehow obtains the ConsumerApp-1 role session name and client ID, it can impersonate ConsumerApp-1 by specifying the ConsumerApp-1 role session name and client ID in the application code.

Let’s assume ConsumerApp-1 uses consumerapp-1-client-id and consumerapp-1-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-1's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

Similarly, ConsumerApp-2 uses consumerapp-2-client-id and consumerapp-2-role-session as its client ID and role session name, respectively. Therefore, ConsumerApp-2's authenticated user principal will appear as follows when it assumes the Topic-B-Read-Role IAM role:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-2-role-session

If ConsumerApp-2 obtains ConsumerApp-1's client ID and role session name and specifies them in its application code, MSK cluster brokers will treat it as ConsumerApp-1 and view its client ID as consumerapp-1-client-id, and the authenticated user principal as follows:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

This allows ConsumerApp-2 to consume data from the MSK cluster at a maximum rate of 5120 bytes per second rather than 1024 bytes per second as per its original quota allocation. Consequently, ConsumerApp-1's throughput will be negatively impacted if ConsumerApp-2 runs concurrently.

Enhanced architecture

You can introduce AWS Secrets Manager and AWS Key Management Service (AWS KMS) in the architecture to secure applications’ client IDs and role session names. To provide stronger governance, the applications’ client ID and role session name must be stored as encrypted secrets in the Secrets Manager. The IAM resource policies associated with encrypted secrets and a KMS customer managed key (CMK) will allow applications to access and decrypt only their respective client ID and role session name. In this way, applications will not be able to access each other’s client ID and role session name and impersonate one another. The following image shows the enhanced architecture.

The updated flow has the following stages:

  • P1ProducerApp-1 retrieves its client-id and role-session-name secrets from Secrets Manager
  • P2ProducerApp-1 configures the secret client-id as CLIENT_ID_CONFIG in the application code, and assumes Topic-B-Write-Role (via its ProducerApp-1-Role IAM role) by passing the secret role-session-name to the AWS STS SDK assumeRole function call
  • P3 – With the Topic-B-Write-Role IAM role assumed, ProducerApp-1 begins sending messages to Topic-B
  • C1 ConsumerApp-1 and ConsumerApp-2 retrieve their respective client-id and role-session-name secrets from Secrets Manager
  • C2ConsumerApp-1 and ConsumerApp-2 configure their respective secret client-id as CLIENT_ID_CONFIG in their application code, and assume Topic-B-Write-Role (via ConsumerApp-1-Role and ConsumerApp-2-Role IAM roles, respectively) by passing their secret role-session-name in the AWS STS SDK assumeRole function call
  • C3 – With the Topic-B-Read-Role IAM role assumed, ConsumerApp-1 and ConsumerApp-2 start consuming messages from Topic-B

Refer to the documentation for AWS Secrets Manager and AWS KMS to get a better understanding of how they fit into the architecture.

Clean up resources

Navigate to the CloudFormation console and delete the MSKStack stack. All resources created during this post will be deleted.

Conclusion

In this post, we covered detailed steps to configure Amazon MSK quotas and demonstrated their effect through sample client applications. In addition, we discussed how you can use client metrics to determine if a client application is throttled. We also highlighted a potential issue with plaintext client IDs and role session names. We recommend implementing Kafka quotas with Amazon MSK using Secrets Manager and AWS KMS as per the revised architecture diagram to ensure a zero-trust architecture.

If you have feedback or questions about this post, including the revised architecture, we’d be happy to hear from you. We hope you enjoyed reading this post.


About the Author

Vikas Bajaj is a Senior Manager, Solutions Architects, Financial Services at Amazon Web Services. With over two decades of experience in financial services and working with digital-native businesses, he advises customers on product design, technology roadmaps, and application architectures.

Ingest, transform, and deliver events published by Amazon Security Lake to Amazon OpenSearch Service

Post Syndicated from Kevin Fallis original https://aws.amazon.com/blogs/big-data/ingest-transform-and-deliver-events-published-by-amazon-security-lake-to-amazon-opensearch-service/

With the recent introduction of Amazon Security Lake, it has never been simpler to access all your security-related data in one place. Whether it’s findings from AWS Security Hub, DNS query data from Amazon Route 53, network events such as VPC Flow Logs, or third-party integrations provided by partners such as Barracuda Email Protection, Cisco Firepower Management Center, or Okta identity logs, you now have a centralized environment in which you can correlate events and findings using a broad range of tools in the AWS and partner ecosystem.

Security Lake automatically centralizes security data from cloud, on-premises, and custom sources into a purpose-built data lake stored in your account. With Security Lake, you can get a more complete understanding of your security data across your entire organization. You can also improve the protection of your workloads, applications, and data. Security Lake has adopted the Open Cybersecurity Schema Framework (OCSF), an open standard. With OCSF support, the service can normalize and combine security data from AWS and a broad range of enterprise security data sources.

When it comes to near-real-time analysis of data as it arrives in Security Lake and responding to security events your company cares about, Amazon OpenSearch Service provides the necessary tooling to help you make sense of the data found in Security Lake.

OpenSearch Service is a fully managed and scalable log analytics framework that is used by customers to ingest, store, and visualize data. Customers use OpenSearch Service for a diverse set of data workloads, including healthcare data, financial transactions information, application performance data, observability data, and much more. Additionally, customers use the managed service for its ingest performance, scalability, low query latency, and ability to analyze large datasets.

This post shows you how to ingest, transform, and deliver Security Lake data to OpenSearch Service for use by your SecOps teams. We also walk you through how to use a series of prebuilt visualizations to view events across multiple AWS data sources provided by Security Lake.

Understanding the event data found in Security Lake

Security Lake stores the normalized OCSF security events in Apache Parquet format—an optimized columnar data storage format with efficient data compression and enhanced performance to handle complex data in bulk. Parquet format is a foundational format in the Apache Hadoop ecosystem and is integrated into AWS services such as Amazon Redshift Spectrum, AWS Glue, Amazon Athena, and Amazon EMR. It’s a portable columnar format, future proofed to support additional encodings as technology develops, and it has library support across a broad set of languages like Python, Java, and Go. And the best part is that Apache Parquet is open source!

The intent of OCSF is to provide a common language for data scientists and analysts that work with threat detection and investigation. With a diverse set of sources, you can build a complete view of your security posture on AWS using Security Lake and OpenSearch Service.

Understanding the event architecture for Security Lake

Security Lake provides a subscriber framework to provide access to the data stored in Amazon S3. Services such as Amazon Athena and Amazon SageMaker use query access. The solution, in this post, uses data access to respond to events generated by Security Lake.

When you subscribe for data access, events arrive via Amazon Simple Queue Service (Amazon SQS). Each SQS event contains a notification object that has a “pointer” via data used to create a URL to the Parquet object on Amazon S3. Your subscriber processes the event, parses the data found in the object, and transforms it to whatever format makes sense for your implementation.

The solution we provide in this post uses a subscriber for data access. Let’s drill down into what the implementation looks like so that you understand how it works.

Solution overview

The high-level architecture for integrating Security Lake with OpenSearch Service is as follows.

The workflow contains the following steps:

  1. Security Lake persists Parquet formatted data into an S3 bucket as determined by the administrator of Security Lake.
  2. A notification is placed in Amazon SQS that describes the key to get access to the object.
  3. Java code in an AWS Lambda function reads the SQS notification and prepares to read the object described in the notification.
  4. Java code uses Hadoop, Parquet, and Avro libraries to retrieve the object from Amazon S3 and transform the records in the Parquet object into JSON documents for indexing in your OpenSearch Service domain.
  5. The documents are gathered and then sent to your OpenSearch Service domain, where index templates map the structure into a schema optimized for Security Lake logs in OCSF format.

Steps 1–2 are managed by Security Lake; steps 3–5 are managed by the customer. The shaded components are your responsibility. The subscriber implementation for this solution uses Lambda and OpenSearch Service, and these resources are managed by you.

If you are evaluating this as solution for your business, remember that Lambda has a 15-minute maximum execution time at the time of this writing. Security Lake can produce up to 256MB object sizes and this solution may not be effective for your company’s needs at large scale. Various levers in Lambda have impacts on the cost of the solution for log delivery. Make cost conscious decisions when evaluating sample solutions. This implementation using Lambda is suitable for smaller companies where to volume of logs for CloudTrail and VPC flow logs are more suitable for a Lambda based approach where the cost to transform and deliver logs to Amazon OpenSearch Service are more budget friendly.

Now that you have some context, let’s start building the implementation for OpenSearch Service!

Prerequisites

Creation of Security Lake for your AWS accounts is a prerequisite for building this solution. Security Lake integrates with an AWS Organizations account to enable the offering for selected accounts in the organization. For a single AWS account that doesn’t use Organizations, you can enable Security Lake without the need for Organizations. You must have administrative access to perform these operations. For multiple accounts, it’s suggested that you delegate the Security Lake activities to another account in your organization. For more information about enabling Security Lake in your accounts, review Getting started.

Additionally, you may need to take the provided template and adjust it to your specific environment. The sample solution relies on access to a public S3 bucket hosted for this blog so egress rules and permissions modifications may be required if you use S3 endpoints.

This solution assumes that you’re using a domain deployed in a VPC. Additionally, it assumes that you have fine-grained access controls enabled on the domain to prevent unauthorized access to data you store as part of the integration with Security Lake. VPC-deployed domains are privately routable and have no access to the public internet by design. If you want to access your domain in a more public setting, you need to create a NGINX proxy to broker a request between public and private settings.

The remaining sections in this post are focused on how to create the integration with OpenSearch Service.

Create the subscriber

To create your subscriber, complete the following steps:

  1. On the Security Lake console, choose Subscribers in the navigation pane.
  2. Choose Create subscriber.
  3. Under Subscriber details, enter a meaningful name and description.
  4. Under Log and event sources, specify what the subscriber is authorized to ingest. For this post, we select All log and event sources.
  5. For Data access method, select S3.
  6. Under Subscriber credentials, provide the account ID and an external ID for which AWS account you want to provide access.
  7. For Notification details, select SQS queue.
  8. Choose Create when you are finished filling in the form.

It will take a minute or so to initialize the subscriber framework, such as the SQS integration and the permission generated so that you can access the data from another AWS account. When the status changes from Creating to Created, you have access to the subscriber endpoint on Amazon SQS.

  1. Save the following values found in the subscriber Details section:
    1. AWS role ID
    2. External ID
    3. Subscription endpoint

Use AWS CloudFormation to provision Lambda integration between the two services

An AWS CloudFormation template takes care of a large portion of the setup for the integration. It creates the necessary components to read the data from Security Lake, transform it into JSON, and then index it into your OpenSearch Service domain. The template also provides the necessary AWS Identity and Access Management (IAM) roles for integration, the tooling to create an S3 bucket for the Java JAR file used in the solution by Lambda, and a small Amazon Elastic Compute Cloud (Amazon EC2) instance to facilitate the provisioning of templates in your OpenSearch Service domain.

To deploy your resources, complete the following steps:

  1. On the AWS CloudFormation console, create a new stack.
  2. For Prepare template, select Template is ready.
  3. Specify your template source as Amazon S3 URL.

You can either save the template to your local drive or copy the link for use on the AWS CloudFormation console. In this example, we use the template URL that points to a template stored on Amazon S3. You can either use the URL on Amazon S3 or install it from your device.

  1. Choose Next.
  2. Enter a name for your stack. For this post, we name the stack blog-lambda. Start populating your parameters based on the values you copied from Security Lake and OpenSearch Service. Ensure that the endpoint for the OpenSearch domain has a forward slash / at the end of the URL that you copy from OpenSearch Service.
  3. Populate the parameters with values you have saved or copied from OpenSearch Service and Security Lake, then choose Next.
  4. Select Preserve successfully provisioned resources to preserve the resources in case the stack roles back so you can debug the issues.
  5. Scroll to bottom of page and choose Next.
  6. On the summary page, select the check box that acknowledges IAM resources will be created and used in this template.
  7. Choose Submit.

The stack will take a few minutes to deploy.

  1. After the stack has deployed, navigate to the Outputs tab for the stack you created.
  2. Save the CommandProxyInstanceID for executing scripts and save the two role ARNs to use in the role mappings step.

You need to associate the IAM roles for the tooling instance and the Lambda function with OpenSearch Service security roles so that the processes can work with the cluster and the resources within.

Provision role mappings for integrations with OpenSearch Service

With the template-generated IAM roles, you need to map the roles using role mapping to the predefined all_access role in your OpenSearch Service cluster. You should evaluate your specific use of any roles and ensure they are aligned with your company’s requirements.

  1. In OpenSearch Dashboards, choose Security in the navigation pane.
  2. Choose Roles in the navigation pane and look up the all_access role.
  3. On the role details page, on the Mapped users tab, choose Manage mapping.
  4. Add the two IAM roles found in the outputs of the CloudFormation template, then choose Map.

Provision the index templates used for OCSF format in OpenSearch Service

Index templates have been provided as part of the initial setup. These templates are crucial to the format of the data so that ingestion is efficient and tuned for aggregations and visualizations. Data that comes from Security Lake is transformed into a JSON format, and this format is based directly on the OCSF standard.

For example, each OCSF category has a common Base Event class that contains multiple objects that represent details like the cloud provider in a Cloud object, enrichment data using an Enrichment object that has a common structure across events but can have different values based on the event, and even more complex structures that have inner objects, which themselves have more inner objects such as the Metadata object, still part of the Base Event class. The Base Event class is the foundation for all categories in OCSF and helps you with the effort of correlating events written into Security Lake and analyzed in OpenSearch.

OpenSearch is technically schema-less. You don’t have to define a schema up front. The OpenSearch engine will try to guess the data types and the mappings found in the data coming from Security Lake. This is known as dynamic mapping. The OpenSearch engine also provides you with the option to predefine the data you are indexing. This is known as explicit mapping. Using explicit mappings to identifying your data source types and how they are stored at time of ingestion is key to getting high volume ingest performance for time-centric data indexed at heavy load.

In summary, the mapping templates use composable templates. In this construct, the solution establishes an efficient schema for the OCSF standard and gives you the capability to correlate events and specialize on specific categories in the OCSF standard.

You load the templates using the tools proxy created by your CloudFormation template.

  1. On the stack’s Outputs tab, find the parameter CommandProxyInstanceID.

We use that value to find the instance in AWS Systems Manager.

  1. On the Systems Manager console, choose Fleet manager in the navigation pane.
  2. Locate and select your managed node.
  3. On the Node actions menu, choose Start terminal session.
  4. When you’re connected to the instance, run the following commands:
    cd;pwd
    . /usr/share/es-scripts/es-commands.sh | grep -o '{\"acknowledged\":true}' | wc -l

You should see a final result of 42 occurrences of {“acknowledged”:true}, which demonstrates the commands being sent were successful. Ignore the warnings you see for migration. The warnings don’t affect the scripts and as of this writing can’t be muted.

  1. Navigate to Dev Tools in OpenSearch Dashboards and run the following command:
    GET _cat/templates

This confirms that the scripts were successful.

Install index patterns, visualizations, and dashboards for the solution

For this solution, we prepackaged a few visualizations so that you can make sense of your data. Download the visualizations to your local desktop, then complete the following steps:

  1. In OpenSearch Dashboards, navigate to Stack Management and Saved Objects.
  2. Choose Import.
  3. Choose the file from your local device, select your import options, and choose Import.

You will see numerous objects that you imported. You can use the visualizations after you start importing data.

Enable the Lambda function to start processing events into OpenSearch Service

The final step is to go into the configuration of the Lambda function and enable the triggers so that the data can be read from the subscriber framework in Security Lake. The trigger is currently disabled; you need to enable it and save the config. You will notice the function is throttled, which is by design. You need to have templates in the OpenSearch cluster so that the data indexes in the desired format.

  1. On the Lambda console, navigate to your function.
  2. On the Configurations tab, in the Triggers section, select your SQS trigger and choose Edit.
  3. Select Activate trigger and save the setting.
  4. Choose Edit concurrency.
  5. Configure your concurrency and choose Save.

Enable the function by setting the concurrency setting to 1. You can adjust the setting as needed for your environment.

You can review the Amazon CloudWatch logs on the CloudWatch console to confirm the function is working.

You should see startup messages and other event information that indicates logs are being processed. The provided JAR file is set for information level logging and if needed, to debug any concerns, there is a verbose debug version of the JAR file you can use. Your JAR file options are:

If you choose to deploy the debug version, the verbosity of the code will show some error-level details in the Hadoop libraries. To be clear, Hadoop code will display lots of exceptions in debug mode because it tests environment settings and looks for things that aren’t provisioned in your Lambda environment, like a Hadoop metrics collector. Most of these startup errors are not fatal and can be ignored.

Visualize the data

Now that you have data flowing into OpenSearch Service from Security Lake via Lambda, it’s time to put those imported visualizations to work. In OpenSearch Dashboards, navigate to the Dashboards page.

You will see four primary dashboards aligned around the OCSF category for which they support. The four supported visualization categories are for DNS activity, security findings, network activity, and AWS CloudTrail using the Cloud API.

Security findings

The findings dashboard is a series of high-level summary information that you use for visual inspection of AWS Security Hub findings in a time window specified by you in the dashboard filters. Many of the encapsulated visualizations give “filter on click” capabilities so you can narrow your discoveries. The following screenshot shows an example.

The Finding Velocity visualization shows findings over time based on severity. The Finding Severity visualization shows which “findings” have passed or failed, and the Findings table visualization is a tabular view with actual counts. Your goal is to be near zero in all the categories except informational findings.

Network activity

The network traffic dashboard provides an overview for all your accounts in the organization that are enabled for Security Lake. The following example is monitoring 260 AWS accounts, and this dashboard summarizes the top accounts with network activities. Aggregate traffic, top accounts generating traffic and top accounts with the most activity are found in the first section of the visualizations.

Additionally, the top accounts are summarized by allow and deny actions for connections. In the visualization below, there are fields that you can drill down into other visualizations. Some of these visualizations have links to third party website that may or may not be allowed in your company. You can edit the links in the Saved objects in the Stack Management plugin.

For drill downs, you can drill down by choosing the account ID to get a summary by account. The list of egress and ingress traffic within a single AWS account is sorted by the volume of bytes transferred between any given two IP addresses.

Finally, if you choose the IP addresses, you’ll be redirected to Project Honey Pot, where you can see if the IP address is a threat or not.

DNS activity

The DNS activity dashboard shows you the requestors for DNS queries in your AWS accounts. Again, this is a summary view of all the events in a time window.

The first visualization in the dashboard shows DNS activity in aggregate across the top five active accounts. Of the 260 accounts in this example, four are active. The next visualization breaks the resolves down by the requesting service or host, and the final visualization breaks out the requestors by account, VPC ID, and instance ID for those queries run by your solutions.

API Activity

The final dashboard gives an overview of API activity via CloudTrail across all your accounts. It summarizes things like API call velocity, operations by service, top operations, and other summary information.

If we look at the first visualization in the dashboard, you get an idea of which services are receiving the most requests. You sometimes need to understand where to focus the majority of your threat discovery efforts based on which services may be consumed differently over time. Next, there are heat maps that break down API activity by region and service and you get an idea of what type of API calls are most prevalent in your accounts you are monitoring.

As you scroll down on the form, more details present themselves such as top five services with API activity and the top API operations for the organization you are monitoring.

Conclusion

Security Lake integration with OpenSearch Service is easy to achieve by following the steps outlined in this post. Security Lake data is transformed from Parquet to JSON, making it readable and simple to query. Enable your SecOps teams to identify and investigate potential security threats by analyzing Security Lake data in OpenSearch Service. The provided visualizations and dashboards can help to navigate the data, identify trends and rapidly detect any potential security issues in your organization.

As next steps, we recommend to use the above framework and associated templates that provide you with easy steps to visualize your Security Lake data using OpenSearch Service.

In a series of follow-up posts, we will review the source code and walkthrough published examples of the Lambda ingestion framework in the AWS Samples GitHub repo. The framework can be modified for use in containers to help address companies that have longer processing times for large files published in Security Lake. Additionally, we will discuss how to detect and respond to security events using example implementations that use OpenSearch plugins such as Security Analytics, Alerting, and the Anomaly Detection available in Amazon OpenSearch Service.


About the authors

Kevin Fallis (@AWSCodeWarrior) is an Principal AWS Specialist Search Solutions Architect. His passion at AWS is to help customers leverage the correct mix of AWS services to achieve success for their business goals. His after-work activities include family, DIY projects, carpentry, playing drums, and all things music.

Jimish Shah is a Senior Product Manager at AWS with 15+ years of experience bringing products to market in log analytics, cybersecurity, and IP video streaming. He’s passionate about launching products that offer delightful customer experiences, and solve complex customer problems. In his free time, he enjoys exploring cafes, hiking, and taking long walks

Ross Warren is a Senior Product SA at AWS for Amazon Security Lake based in Northern Virginia. Prior to his work at AWS, Ross’ areas of focus included cyber threat hunting and security operations. When he is not talking about AWS he likes to spend time with his family, bake bread, make sawdust and enjoy time outside.

Best practices for running production workloads using Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/best-practices-for-running-production-workloads-using-amazon-msk-tiered-storage/

In the second post of the series, we discussed some core concepts of the Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage feature and explained how read and write operations work in a tiered storage enabled cluster.

This post focuses on how to properly size your MSK tiered storage cluster, which metrics to monitor, and the best practices to consider when running a production workload.

Sizing a tiered storage cluster

Sizing and capacity planning are critical aspects of designing and operating a distributed system. It involves estimating the resources required to handle the expected workload and ensure the system can scale and perform efficiently. In the context of a distributed system like Kafka, sizing involves determining the number of brokers, the number of partitions, and the amount of storage and memory required for each broker. Capacity planning involves estimating the expected workload, including the number of producers, consumers, and the throughput requirements.

Let’s assume a scenario where the producers are evenly balancing the load between brokers, brokers host the same number of partitions, there are enough partitions to ingest the throughput, and consumers consume directly from the tip of the stream. The brokers are receiving the same load and doing the same work. We therefore just focus on Broker1 in the following diagram of a data flow within a cluster.

Theoretical sustained throughput with tiered storage

We derive the following formula for the theoretical sustained throughput limit tcluster given the infrastructure characteristics of a specific cluster with tiered storage enabled on all topics:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

This formula contains the following values:

  • tCluster – Total ingress produce throughput sent to the cluster
  • tStorage – Storage volume throughput supported
  • tNetworkAttachedStorage – Network attached storage to the Amazon Elastic Compute Cloud (Amazon EC2) instance network throughput
  • tEC2network – EC2 instance network bandwidth
  • non_tip_local_consumer_groups – Number of consumer groups reading from network attached storage at ingress rate
  • tip_consumer_groups – Number of consumer groups reading from page cache at ingress rate
  • remote_consumer_groups – Number of consumer groups reading from remote tier at ingress rate
  • r – Replication factor of the Kafka topic

Note that in the first post , we didn’t differentiate between different types of consumer groups. With tiered storage, some consumer groups might be consuming from remote. These remote consumers might ultimately catch up and start reading from local storage and finally catch up to the tip. Therefore, we model these three different consumer groups in the equation to account for their impact on infrastructure usage. In the following sections, we provide derivations of this equation.

Derivation of throughput limit from network attached storage bottleneck

Because Amazon MSK uses network attached storage for local storage, both network attached storage throughput and bandwidth should be accounted for. Total throughput bandwidth requirement is a combination of ingress and egress from the network attached storage backend. The ingress throughput of the storage backend depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. With tiered storage, Amazon MSK also uses network attached storage to read and upload rolled segments to the remote tier. This doesn’t come from the page cache and needs to be accounted for at the rate of ingress. Any non-tip consumers at ingress rate also consume network attached storage throughput and are accounted for in the equation. Therefore, max throughput is bounded by network attached storage based on the following equation:

max(tcluster) <= min {

max(tstorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups),
max(tNetworkAttachedStorage) * #brokers/(r + 1 + #non_tip_local_consumer_groups)

}

Derivation of throughput limit from EC2 network bottleneck

Unlike network attached storage, the network is full duplex, meaning that if the EC2 instance supports X MB/s network, it supports X MB/s in and X MB/s out. The network throughput requirement depends on the data that producers are sending directly to the broker plus the replication traffic the broker is receiving from its peers. It also includes the replication traffic out and consumers traffic out from this broker. With tiered storage, we need to reserve additional ingress rate for uploads to the remote tier and support reads from the remote tier for consumer groups reading from remote offset. Both of these add to the network out requirements, which is bounded by the following equation:

max(tcluster) <= min {

max(tEC2network) * #brokers/(#tip_consumer_groups + r + #remote_consumer_groups)

}

Combining the second and third equations provides the first formula, which determines the max throughput bound based on broker infrastructure limits.

How to apply this formula to size your cluster

With this formula, you can calculate the upper bound for throughput you can achieve for your workloads. In practice, the workloads may be bottlenecked by other broker resources like CPU, memory, and disk, so it’s important to do load tests. To simplify your sizing estimate, you can use the MSK Sizing and Pricing spreadsheet (for more information, refer to Best Practices).

Let’s consider a workload where your ingress and egress rates are 20MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. This workload requires 6x m5.large brokers, with 34.6 TB local storage, which will cost $6,034.00 monthly (estimated). But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB local storage and 12 TB of tiered storage, which will cost $1,958.00 monthly(estimated). If you want to read all the historic data one time, it will cost $17.00 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 67.6% of your overall cost.

We recommend planning for Availability Zone redundancy in production workloads considering the broker safety factor in the calculation, which is 1 in this example. We also recommend running performance tests to ensure CPU is less than 70% on your brokers at the target throughput derived based on this formula or Excel calculation. In addition, you should also use the per-broker partition limit in your calculation to account for other bottlenecks based on the partition count.

The following figure shows an example of Amazon MSK sizing.

Monitoring and continuous optimization for a tiered storage enabled cluster

In previous sections, we emphasized the importance of determining the correct initial cluster size. However, it’s essential to recognize that sizing efforts shouldn’t cease after the initial setup. Continual monitoring and evaluation of your workload are necessary to ensure that the broker size remains appropriate. Amazon MSK offers metric monitoring and alarm capabilities to provide visibility into cluster performance. In the post Best practices for right-sizing your Apache Kafka clusters to optimize performance and cost, we discussed key metrics to focus on. In this post, we delve deeper into additional metrics related to tiered storage and other optimization considerations for a tiered storage enabled cluster:

  • TotalTierBytesLag indicates the total number of bytes of the data that is eligible for tiering on the broker and hasn’t been transferred to the tiered storage yet. This metric shows the efficiency of upstream data transfer. As the lag increases, the amount of data that hasn’t yet persisted in the tiered storage increases. The impact is the network attached storage disk may fill up, which you should monitor. You should also monitor this metric and generate an alarm if the lag is continuously growing and you see increased network attached storage usage. If the tiering lag is too high, you can reduce ingress traffic to allow the tiered storage to catch up.
  • Although tiered storage provides on-demand, virtually unlimited storage capacity without provisioning any additional resources, you should still do proper capacity planning for your local storage, configure alerts for KafkaDataLogsDiskUsed metrics, and have a buffer on network attached storage capacity planning. Monitor this metric and generate an alarm if the metric reaches or exceeds 60%. For a tiered storage enabled topic, configure local retention accordingly to reduce network attached storage usage.
  • The theoretical max ingress we can achieve on an MSK cluster with tiered storage is 20–25% lower than a non-tiered storage enabled cluster due to additional network attached storage bandwidth required to transparently move data from the local to the remote tier. Plan for the capacity (brokers, storage, gp2 vs. gp3) using the formula we discussed to derive max ingress for your cluster based on the number of consumer groups and load test your workloads to identify the sustained throughput limit. Exercising excess ingress to the cluster or egress from the remote tier above the planned capacity can impact your tip produce or consume traffic.
  • The gp3 volume type offers SSD-performance at a 20% lower cost per GB than gp2 volumes. Furthermore, by decoupling storage performance from capacity, you can easily provision higher IOPS and throughput without the need to provision additional block storage capacity. Therefore, we recommend using gp3 for a tiered storage enabled cluster by specifying provisioned throughput for larger instance types.
  • If you specified a custom cluster configuration, check the num.replica.fetchers, num.io.threads, and num.network.threads configuration parameters on your cluster. We recommend leaving it as the default Amazon MSK configuration unless you have specific use case.

This is only the most relevant guidance related to tiered storage. For further guidance on monitoring and best practices of your cluster, refer to Best practices.

Conclusion

You should now have a solid understanding of how Amazon MSK tiered storage works and the best practices to consider for your production workload when utilizing this cost-effective storage tier. With tiered storage, we remove the compute and storage coupling, which can benefit workloads that need larger disk capacity and are underutilizing compute just to provision storage.

We are eager to learn about your current approach in building real-time data streaming applications. If you’re starting your journey with Amazon MSK tiered storage, we suggest following the comprehensive Getting Started guide available in Tiered storage. This guide provides detailed instructions and practical steps to help you gain hands-on experience and effectively take advantage of the benefits of tiered storage for your streaming applications.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Deep dive on Amazon MSK tiered storage

Post Syndicated from Nagarjuna Koduru original https://aws.amazon.com/blogs/big-data/deep-dive-on-amazon-msk-tiered-storage/

In the first post of the series, we described some core concepts of Apache Kafka cluster sizing, the best practices for optimizing the performance, and the cost of your Kafka workload.

This post explains how the underlying infrastructure affects Kafka performance when you use Amazon Managed Streaming for Apache Kafka (Amazon MSK) tiered storage. We delve deep into the core components of Amazon MSK tiered storage and address questions such as: How does read and write work in a tiered storage-enabled cluster?

In the subsequent post, we’ll discuss the latency impact, recommended metrics to monitor, and conclude with guidance on key considerations in a production tiered storage-enabled cluster.

How Amazon MSK tiered storage works

To understand the internal architecture of Amazon MSK tiered storage, let’s first discuss some fundamentals of Kafka topics, partitions, and how read and write works.

A logical stream of data in Kafka is referred to as a topic. A topic is broken down into partitions, which are physical entities used to distribute load across multiple server instances (brokers) that serve reads and writes.

A partition––also designated as topic-partition as it’s relative to a given topic––can be replicated, which means there are several copies of the data in the group of brokers forming a cluster. Each copy is called a replica or a log. One of these replicas, called the leader, serves as the reference. It’s where the ingress traffic is accepted for the topic-partition.

A log is an append-only sequence of log segments. Log segments contain Kafka data records, which are added to the end of the log or the active segment.

Log segments are stored as regular files. On the file system, Kafka identifies the file of a log segment by putting the offset of the first data record it contains in its file name. The offset of a record is simply a monotonic index assigned to a record by Kafka when it’s appended to the log. The segment files of a log are stored in a directory dedicated to the associated topic-partition.

When Kafka reads data from an arbitrary offset, it first looks up the segment that contains that offset from the segment file name, then the specific record location inside that file using an offset index. Offset indexes are materialized on a dedicated file stored with segment files in the topic-partition directory. There is also timeindex to seek by timestamp.

For every partition, Kafka also stores a journal of leadership changes in a file called leader-epoch-checkpoint. This file contains mapping of leader epoch to startOffset of the epoch. Whenever a new leader is elected for a partition by the Kafka controller, this data is updated and propagated to all brokers. A leader epoch is a 32-bit, monotonically increasing number representing continuous period of leadership of a single partition. It’s marked on all the Kafka records. The following code is the local storage layout of topic cars and partition 0 containing two segments (0, 35):

$ ls /kafka-cluster/broker-1/data/cars-0/

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex
leader-epoch-checkpoint

Kafka manages the lifecycle of these segment files. It creates a new one when a new segment needs to be created, for instance, if the current segment reaches its configured max size. It deletes one when the target retention period of the data it contains is reached, or the total maximal size of the log is reached. Data is deleted from the tail of the logs and corresponds to the oldest data of the append-only log of the topic-partition.

KIP-405 or tiered storage in Apache Kafka

The ability to tier data, in other words, transfer data (log, index, timeindex, and leader-epoch-checkpoint) from a local file system to another storage system based on time and size-based retention policies, is a feature built in Apache Kafka as part of KIP-405.

The KIP-405 isn’t in official Kafka version yet. Amazon MSK internally implemented tiered storage functionality on top of official Kafka version 2.8.2. Amazon MSK exposes this functionality on AWS specific 2.8.2.tiered Kafka version. With this feature, you can separate retention settings for local and remote retention. Data in the local tier is retained until the data gets copied to the remote tier even after the local retention expires. Data in the remote tier is retained until the remote retention expires. KIP-405 proposes a pluggable architecture allowing you to plugin custom remote storage and metadata storage backends. The following diagram illustrates the broker three key components.

The components are as follows:

  • RemoteLogManager (RLM) – A new component corresponding to LogManager for the local tier. It delegates copy, fetch, and delete of completed and non-active partition segments to a pluggable RemoteStorageManager implementation and maintains respective remote log segment metadata through pluggable RemoteLogMetadataManager implementation.
  • RemoteStorageManager (RSM) – A pluggable interface that provides the lifecycle of remote log segments.
  • RemoteLogMetadataManager (RLMM) – A pluggable interface that provides the lifecycle of metadata about remote log segments.

How data is moved to the remote tier for a tiered storage-enabled topic

In a tiered storage-enabled topic, each completed segment for a topic-partition triggers the leader of the partition to copy the data to the remote storage tier. The completed log segment is removed from the local disks when Amazon MSK finishes moving that log segment to the remote tier and after it meets the local retention policy. This frees up local storage space.

Let’s consider a hypothetical scenario: you have a topic with one partition. Prior to enabling tiered storage for this topic, there are three log segments. One of the segments is active and receiving data, and the other two segments are complete.

After you enable tiered storage for this topic with two days of local retention and five days of overall retention, Amazon MSK copies log segment 1 and 2 to tiered storage. Amazon MSK also retains the primary storage copy of segments 1 and 2. The active segment 3 isn’t eligible to copy over to tiered storage yet. In this timeline, none of the retention settings are applied yet for any of the messages in segment 1 and segment 2.

After 2 days, the primary retention settings take effect for the segment 1 and segment 2 that Amazon MSK copied to the tiered storage. Segments 1 and 2 now expire from the local storage. Active segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s an active segment.

After 5 days, overall retention settings take effect, and Amazon MSK clears log segments 1 and 2 from tiered storage. Segment 3 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it’s active.

That’s how the data lifecycle works on a tiered storage-enabled cluster.

Amazon MSK immediately starts moving data to tiered storage as soon as a segment is closed. The local disks are freed up when Amazon MSK finishes moving that log segment to remote tier and after it meets the local retention policy.

How read works in a tiered storage-enabled topic

For any read request, ReplicaManager tries to process the request by sending it to ReadFromLocalLog. And if the process returns offset out of range exception, it delegates the read call to RemoteLogManager to read from tiered storage. On the read path, the RemoteStorageManager starts fetching the data in chunks from remote storage, which means that for the first few bytes, your consumer experiences higher latency, but as the system starts buffering the segment locally, your consumer experiences latency similar to reading from local storage. One of the advantages of this approach is that the data is served instantly from the local buffer if there are multiple consumers reading from the same segment.

If your consumer is configured to read from the closest replica, there might be a possibility that the consumer from a different consumer group reads the same remote segment using a different broker. In that case, they experience the same latency behavior we described previously.

Conclusion

In this post, we discussed the core components of the Amazon MSK tiered storage feature and explained how the data lifecycle works in a cluster enabled with tiered storage. Stay tuned for our upcoming post, in which we delve into the best practices for sizing and running a tiered storage-enabled cluster in production.

We would love to hear how you’re building your real-time data streaming applications today. If you’re just getting started with Amazon MSK tiered storage, we recommend getting hands-on with the guidelines available in the tiered storage documentation.

If you have any questions or feedback, please leave them in the comments section.


About the authors

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for realtime tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Migrate from Google BigQuery to Amazon Redshift using AWS Glue and Custom Auto Loader Framework

Post Syndicated from Tahir Aziz original https://aws.amazon.com/blogs/big-data/migrate-from-google-bigquery-to-amazon-redshift-using-aws-glue-and-custom-auto-loader-framework/

Amazon Redshift is a widely used, fully managed, petabyte-scale cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytic workloads. Customers are looking for tools that make it easier to migrate from other data warehouses, such as Google BigQuery, to Amazon Redshift to take advantage of the service price-performance, ease of use, security, and reliability.

In this post, we show you how to use AWS native services to accelerate your migration from Google BigQuery to Amazon Redshift. We use AWS Glue, a fully managed, serverless, ETL (extract, transform, and load) service, and the Google BigQuery Connector for AWS Glue (for more information, refer to Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors). We also add automation and flexibility to simplify migration of multiple tables to Amazon Redshift using the Custom Auto Loader Framework.

Solution overview

The solution provides a scalable and managed data migration workflow to migrate data from Google BigQuery to Amazon Simple Storage Service (Amazon S3), and then from Amazon S3 to Amazon Redshift. This pre-built solution scales to load data in parallel using input parameters.

The following architecture diagram shows how the solution works. It starts with setting up the migration configuration to connect to Google BigQuery, then converts the database schemas, and finally migrates the data to Amazon Redshift.

Architecture diagram showing how the solution works. It starts with setting-up the migration configuration to connect to Google BigQuery, then convert the database schemas, and finally migrate the data to Amazon Redshift.

The workflow contains the following steps:

  1. A configuration file is uploaded to an S3 bucket you have chosen for this solution. This JSON file contains the migration metadata, namely the following:
    • A list of Google BigQuery projects and datasets.
    • A list of all tables to be migrated for each project and dataset pair.
  2. An Amazon EventBridge rule triggers an AWS Step Functions state machine to start migrating the tables.
  3. The Step Functions state machine iterates on the tables to be migrated and runs an AWS Glue Python shell job to extract the metadata from Google BigQuery and store it in an Amazon DynamoDB table used for tracking the tables’ migration status.
  4. The state machine iterates on the metadata from this DynamoDB table to run the table migration in parallel, based on the maximum number of migration jobs without incurring limits or quotas on Google BigQuery. It performs the following steps:
    • Runs the AWS Glue migration job for each table in parallel.
    • Tracks the run status in the DynamoDB table.
    • After the tables have been migrated, checks for errors and exits.
  5. The data exported from Google BigQuery is saved to Amazon S3. We use Amazon S3 (even though AWS Glue jobs can write directly to Amazon Redshift tables) for a few specific reasons:
    • We can decouple the data migration and the data load steps.
    • It offers more control on the load steps, with the ability to reload the data or pause the process.
    • It provides fine-grained monitoring of the Amazon Redshift load status.
  6. The Custom Auto Loader Framework automatically creates schemas and tables in the target database and continuously loads data from Amazon S3 to Amazon Redshift.

A few additional points to note:

  • If you have already created the target schema and tables in the Amazon Redshift database, you can configure the Custom Auto Loader Framework to not automatically detect and convert the schema.
  • If you want more control over converting the Google BigQuery schema, you can use the AWS Schema Conversion Tool (AWS SCT). For more information, refer to Migrate Google BigQuery to Amazon Redshift using AWS Schema Conversion tool (SCT).
  • As of this writing, neither the AWS SCT nor Custom Auto Loader Framework support the conversion of nested data types (record, array and struct). Amazon Redshift supports semistructured data using the Super data type, so if your table uses such complex data types, then you need to create the target tables manually.

To deploy the solution, there are two main steps:

  1. Deploy the solution stack using AWS CloudFormation.
  2. Deploy and configure Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift.

Prerequisites

Before getting started, make sure you have the following:

In this example, we named the file bq-mig-config.json

    1. Configure your Google account.
    2. Create an IAM role for AWS Glue (and note down the name of the IAM role).
    3. Subscribe to and activate the Google BigQuery Connector for AWS Glue.

Deploy the solution using AWS CloudFormation

To deploy the solution stack using AWS CloudFormation, complete the following steps:

  1. Choose Launch Stack:

This template provisions the AWS resources in the us-east-1 Region. If you want to deploy to a different Region, download the template bigquery-cft.yaml and launch it manually: on the AWS CloudFormation console, choose Create stack with new resources and upload the template file you downloaded.

The list of provisioned resources is as follows:

    • An EventBridge rule to start the Step Functions state machine on the upload of the configuration file.
    • A Step Functions state machine that runs the migration logic. The following diagram illustrates the state machine.
      Diagram representing the state machine deployed by the solution stack.
    • An AWS Glue Python shell job used to extract the metadata from Google BigQuery. The metadata will be stored in an DynamoDB table, with a calculated attribute to prioritize the migration job. By default, the connector creates one partition per 400 MB in the table being read (before filtering). As of this writing, the Google BigQuery Storage API has a maximum quota for parallel read streams, so we set the limit for worker nodes for tables larger than 400 GB. We also calculate the max number of jobs that can run in parallel based on those values.
    • An AWS Glue ETL job used to extract the data from each Google BigQuery table and saves it in Amazon S3 in Parquet format.
    • A DynamoDB table (bq_to_s3_tracking) used to store the metadata for each table to be migrated (size of the table, S3 path used to store the migrated data, and the number of workers needed to migrate the table).
    • A DynamoDB table (bq_to_s3_maxstreams) used to store the maximum number of streams per state machine run. This helps us minimize job failures due to limits or quotas. Use the Cloud Formation template to customize the name of the DynamoDB table. The prefix for the DynamoDB table is bq_to_s3.
    • The IAM roles needed by the state machine and AWS Glue jobs.
  1. Choose Next.

Screen caption showing the AWS Cloudformation Create stack page.

  1. For Stack name, enter a name.
  2. For Parameters, enter the parameters listed in the following table, then choose Create.
CloudFormation Template Parameter Allowed Values Description
InputBucketName S3 bucket name

The S3 bucket where the AWS Glue job stores the migrated data.

The data will be actually stored in a folder named s3-redshift-loader-source, which is used by the Custom Auto Loader Framework.

InputConnectionName AWS Glue connection name, the default is glue-bq-connector-24 The name of the AWS Glue connection that is created using the Google BigQuery connector.
InputDynamoDBTablePrefix DynamoDB table name prefix, the default is bq_to_s3 The prefix that will be used when naming the two DynamoDB tables created by the solution.
InputGlueETLJob AWS Glue ETL job name, the default is bq-migration-ETL The name you want to give to the AWS Glue ETL job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueMetaJob AWS Glue Python shell job name, the default is bq-get-metadata The name you want to give to AWS Glue Python shell job. The actual script is saved in the S3 path specified in the parameter InputGlueS3Path.
InputGlueS3Path S3 path, the default is s3://aws-glue-scripts-${AWS::Account}-${AWS::Region}/admin/ This is the S3 path in which the stack will copy the scripts for AWS Glue jobs. Remember to replace: ${AWS::Account} with the actual AWS account ID and ${AWS::Region} with the Region you plan to use, or provide your own bucket and prefix in a complete path.
InputMaxParallelism Number of parallel migration jobs to run, the default is 30 The maximum number of tables you want to migrate concurrently.
InputBQSecret AWS Secrets Manager secret name The name of the AWS Secrets Manager secret in which you stored the Google BigQuery credential.
InputBQProjectName Google BigQuery project name The name of your project in Google BigQuery in which you want to store temporary tables; you will need write permissions on the project.
StateMachineName

Step Functions state machine name, the default is

bq-to-s3-migration-state-machine

The name of the Step Functions state machine.
SourceS3BucketName S3 bucket name, the default is aws-blogs-artifacts-public

The S3 bucket where the artifacts for this post are stored.

Do not change the default.

Deploy and configure the Custom Auto Loader Framework to load files from Amazon S3 to Amazon Redshift

The Custom Auto Loader Framework utility makes data ingestion to Amazon Redshift simpler and automatically loads data files from Amazon S3 to Amazon Redshift. The files are mapped to the respective tables by simply dropping files into preconfigured locations on Amazon S3. For more details about the architecture and internal workflow, refer to Custom Auto Loader Framework.

To set up the Custom Auto Loader Framework, complete the following steps:

  1. Choose Launch Stack to deploy the CloudFormation stack in the us-east-1 Region:

  1. On the AWS CloudFormation console, choose Next.
  2. Provide the following parameters to help ensure the successful creation of resources. Make sure you have collected these values beforehand.
Parameter Name Allowed Values Description
CopyCommandSchedule cron(0/5 * ? * * *) The EventBridge rules KickoffFileProcessingSchedule and QueueRSProcessingSchedule are triggered based on this schedule. The default is 5 minutes.
DatabaseName dev The Amazon Redshift database name.
DatabaseSchemaName public The Amazon Redshift schema name.
DatabaseUserName demo The Amazon Redshift user name who has access to run COPY commands on the Amazon Redshift database and schema.
RedshiftClusterIdentifier democluster The Amazon Redshift cluster name.
RedshiftIAMRoleARN arn:aws:iam::7000000000:role/RedshiftDemoRole The Amazon Redshift cluster attached role, which has access to the S3 bucket. This role is used in COPY commands.
SourceS3Bucket Your-bucket-name The S3 bucket where data is located. Use the same bucket you used to store the migrated data as indicated in the previous stack.
CopyCommandOptions delimiter '|' gzip

Provide the additional COPY command data format parameters as follows:

delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

InitiateSchemaDetection Yes The setting to dynamically detect the schema prior to file upload.

The following screenshot shows an example of our parameters.

Screen capture showing the stack detailes page with the input parameters filled with example values

  1. Choose Create.
  2. Monitor the progress of the Stack creation and wait until it is complete.
  3. To verify the Custom Auto Loader Framework configuration, log in to the Amazon S3 console and navigate to the S3 bucket you provided as a value to the SourceS3Bucket parameter.

You should see a new directory called s3-redshift-loader-source is created.

Screen caption of the Amazon S3 console showing the folder you should be able to see in your S3 bucket.

Test the solution

To test the solution, complete the following steps:

  1. Create the configuration file based on the prerequisites. You can also download the demo file.
  2. To set up the S3 bucket, on the Amazon S3 console, navigate to the folder bq-mig-config in the bucket you provided in the stack.
  3. Upload the config file into it.
  4. To enable EventBridge notifications to the bucket, open the bucket on the console and on the Properties tab, locate Event notifications.
  5. In the Amazon EventBridge section, choose Edit.
  6. Select On, then choose Save changes.

  1. On AWS Step Function console, monitor the run of the state machine.
  2. Monitor the status of the loads in Amazon Redshift. For instructions, refer to Viewing Current Loads.
  3. Open the Amazon Redshift Query Editor V2 and query your data.

Pricing considerations

You might have egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving your data on your Google cloud billing console. As of this writing, AWS Glue 3.0 or later charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. For more information, see AWS Glue Pricing. With auto scaling enabled, AWS Glue automatically adds and removes workers from the cluster depending on the parallelism at each stage or microbatch of the job run.

Clean up

To avoid incurring future charges, clean up your resources:

  1. Delete the CloudFormation solution stack.
  2. Delete the CloudFormation Custom Auto Loader Framework stack.

Conclusion

In this post, we demonstrated how to build a scalable and automated data pipeline to migrate your data from Google BigQuery to Amazon Redshift. We also highlighted how the Custom Auto Loader framework can automate the schema detection, create tables for your S3 files, and continuously load the files into your Amazon Redshift warehouse. With this approach, you can automate the migration of entire projects (even multiple projects at the time) in Google BigQuery to Amazon Redshift. This helps improve data migration times into Amazon Redshift significantly through the automatic table migration parallelization.

The auto-copy feature in Amazon Redshift simplifies automatic data loading from Amazon S3 with a simple SQL command, users can easily automate data ingestion from Amazon S3 to Amazon Redshift using the Amazon Redshift auto-copy preview feature

For more information about the performance of the Google BigQuery Connector for AWS Glue, refer to Migrate terabytes of data quickly from Google Cloud to Amazon S3 with AWS Glue Connector for Google BigQuery and learn how to migrate a large amount of data (1.9 TB) into Amazon S3 quickly (about 8 minutes).

To learn more about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.


About the Authors

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling and cooking.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Fabrizio Napolitano is a Principal Specialist Solutions Architect for DB and Analytics. He has worked in the analytics space for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Manjula Nagineni is a Senior Solutions Architect with AWS based in New York. She works with major financial service institutions, architecting and modernizing their large-scale applications while adopting AWS Cloud services. She is passionate about designing big data workloads cloud-natively. She has over 20 years of IT experience in software development, analytics, and architecture across multiple domains such as finance, retail, and telecom.

Sohaib Katariwala is an Analytics Specialist Solutions Architect at AWS. He has over 12 years of experience helping organizations derive insights from their data.

Build event-driven data pipelines using AWS Controllers for Kubernetes and Amazon EMR on EKS

Post Syndicated from Victor Gu original https://aws.amazon.com/blogs/big-data/build-event-driven-data-pipelines-using-aws-controllers-for-kubernetes-and-amazon-emr-on-eks/

An event-driven architecture is a software design pattern in which decoupled applications can asynchronously publish and subscribe to events via an event broker. By promoting loose coupling between components of a system, an event-driven architecture leads to greater agility and can enable components in the system to scale independently and fail without impacting other services. AWS has many services to build solutions with an event-driven architecture, such as Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), and AWS Lambda.

Amazon Elastic Kubernetes Service (Amazon EKS) is becoming a popular choice among AWS customers to host long-running analytics and AI or machine learning (ML) workloads. By containerizing your data processing tasks, you can simply deploy them into Amazon EKS as Kubernetes jobs and use Kubernetes to manage underlying computing compute resources. For big data processing, which requires distributed computing, you can use Spark on Amazon EKS. Amazon EMR on EKS, a managed Spark framework on Amazon EKS, enables you to run Spark jobs with benefits of scalability, portability, extensibility, and speed. With EMR on EKS, the Spark jobs run using the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less than open-source Apache Spark.

Data processes require a workflow management to schedule jobs and manage dependencies between jobs, and require monitoring to ensure that the transformed data is always accurate and up to date. One popular orchestration tool for managing workflows is Apache Airflow, which can be installed in Amazon EKS. Alternatively, you can use the AWS-managed version, Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Another option is to use AWS Step Functions, which is a serverless workflow service that integrates with EMR on EKS and EventBridge to build event-driven workflows.

In this post, we demonstrate how to build an event-driven data pipeline using AWS Controllers for Kubernetes (ACK) and EMR on EKS. We use ACK to provision and configure serverless AWS resources, such as EventBridge and Step Functions. Triggered by an EventBridge rule, Step Functions orchestrates jobs running in EMR on EKS. With ACK, you can use the Kubernetes API and configuration language to create and configure AWS resources the same way you create and configure a Kubernetes data processing job. Because most of the managed services are serverless, you can build and manage your entire data pipeline using the Kubernetes API with tools such as kubectl.

Solution overview

ACK lets you define and use AWS service resources directly from Kubernetes, using the Kubernetes Resource Model (KRM). The ACK project contains a series of service controllers, one for each AWS service API. With ACK, developers can stay in their familiar Kubernetes environment and take advantage of AWS services for their application-supporting infrastructure. In the post Microservices development using AWS controllers for Kubernetes (ACK) and Amazon EKS blueprints, we show how to use ACK for microservices development.

In this post, we show how to build an event-driven data pipeline using ACK controllers for EMR on EKS, Step Functions, EventBridge, and Amazon Simple Storage Service (Amazon S3). We provision an EKS cluster with ACK controllers using Terraform modules. We create the data pipeline with the following steps:

  1. Create the emr-data-team-a namespace and bind it with the virtual cluster my-ack-vc in Amazon EMR by using the ACK controller.
  2. Use the ACK controller for Amazon S3 to create an S3 bucket. Upload the sample Spark scripts and sample data to the S3 bucket.
  3. Use the ACK controller for Step Functions to create a Step Functions state machine as an EventBridge rule target based on Kubernetes resources defined in YAML manifests.
  4. Use the ACK controller for EventBridge to create an EventBridge rule for pattern matching and target routing.

The pipeline is triggered when a new script is uploaded. An S3 upload notification is sent to EventBridge and, if it matches the specified rule pattern, triggers the Step Functions state machine. Step Functions calls the EMR virtual cluster to run the Spark job, and all the Spark executors and driver are provisioned inside the emr-data-team-a namespace. The output is saved back to the S3 bucket, and the developer can check the result on the Amazon EMR console.

The following diagram illustrates this architecture.

Prerequisites

Ensure that you have the following tools installed locally:

Deploy the solution infrastructure

Because each ACK service controller requires different AWS Identity and Access Management (IAM) roles for managing AWS resources, it’s better to use an automation tool to install the required service controllers. For this post, we use Amazon EKS Blueprints for Terraform and the AWS EKS ACK Addons Terraform module to provision the following components:

  • A new VPC with three private subnets and three public subnets
  • An internet gateway for the public subnets and a NAT Gateway for the private subnets
  • An EKS cluster control plane with one managed node group
  • Amazon EKS-managed add-ons: VPC_CNI, CoreDNS, and Kube_Proxy
  • ACK controllers for EMR on EKS, Step Functions, EventBridge, and Amazon S3
  • IAM execution roles for EMR on EKS, Step Functions, and EventBridge

Let’s start by cloning the GitHub repo to your local desktop. The module eks_ack_addons in addon.tf is for installing ACK controllers. ACK controllers are installed by using helm charts in the Amazon ECR public galley. See the following code:

cd examples/usecases/event-driven-pipeline
terraform init
terraform plan
terraform apply -auto-approve #defaults to us-west-2

The following screenshot shows an example of our output. emr_on_eks_role_arn is the ARN of the IAM role created for Amazon EMR running Spark jobs in the emr-data-team-a namespace in Amazon EKS. stepfunction_role_arn is the ARN of the IAM execution role for the Step Functions state machine. eventbridge_role_arn is the ARN of the IAM execution role for the EventBridge rule.

The following command updates kubeconfig on your local machine and allows you to interact with your EKS cluster using kubectl to validate the deployment:

region=us-west-2
aws eks --region $region update-kubeconfig --name event-driven-pipeline-demo

Test your access to the EKS cluster by listing the nodes:

kubectl get nodes
# Output should look like below
NAME                                        STATUS   ROLES    AGE     VERSION
ip-10-1-10-64.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-65.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-7.us-west-2.compute.internal     Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-10-73.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-11-96.us-west-2.compute.internal    Ready    <none>   19h     v1.24.9-eks-49d8fe8
ip-10-1-12-197.us-west-2.compute.internal   Ready    <none>   19h     v1.24.9-eks-49d8fe8

Now we’re ready to set up the event-driven pipeline.

Create an EMR virtual cluster

Let’s start by creating a virtual cluster in Amazon EMR and link it with a Kubernetes namespace in EKS. By doing that, the virtual cluster will use the linked namespace in Amazon EKS for running Spark workloads. We use the file emr-virtualcluster.yaml. See the following code:

apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
metadata:
  name: my-ack-vc
spec:
  name: my-ack-vc
  containerProvider:
    id: event-driven-pipeline-demo  # your eks cluster name
    type_: EKS
    info:
      eksInfo:
        namespace: emr-data-team-a # namespace binding with EMR virtual cluster

Let’s apply the manifest by using the following kubectl command:

kubectl apply -f ack-yamls/emr-virtualcluster.yaml

You can navigate to the Virtual clusters page on the Amazon EMR console to see the cluster record.

Create an S3 bucket and upload data

Next, let’s create a S3 bucket for storing Spark pod templates and sample data. We use the s3.yaml file. See the following code:

apiVersion: s3.services.k8s.aws/v1alpha1
kind: Bucket
metadata:
  name: sparkjob-demo-bucket
spec:
  name: sparkjob-demo-bucket

kubectl apply -f ack-yamls/s3.yaml

If you don’t see the bucket, you can check the log from the ACK S3 controller pod for details. The error is mostly caused if a bucket with the same name already exists. You need to change the bucket name in s3.yaml as well as in eventbridge.yaml and sfn.yaml. You also need to update upload-inputdata.sh and upload-spark-scripts.sh with the new bucket name.

Run the following command to upload the input data and pod templates:

bash spark-scripts-data/upload-inputdata.sh

The sparkjob-demo-bucket S3 bucket is created with two folders: input and scripts.

Create a Step Functions state machine

The next step is to create a Step Functions state machine that calls the EMR virtual cluster to run a Spark job, which is a sample Python script to process the New York City Taxi Records dataset. You need to define the Spark script location and pod templates for the Spark driver and executor in the StateMachine object .yaml file. Let’s make the following changes (highlighted) in sfn.yaml first:

  • Replace the value for roleARN with stepfunctions_role_arn
  • Replace the value for ExecutionRoleArn with emr_on_eks_role_arn
  • Replace the value for VirtualClusterId with your virtual cluster ID
  • Optionally, replace sparkjob-demo-bucket with your bucket name

See the following code:

apiVersion: sfn.services.k8s.aws/v1alpha1
kind: StateMachine
metadata:
  name: run-spark-job-ack
spec:
  name: run-spark-job-ack
  roleARN: "arn:aws:iam::xxxxxxxxxxx:role/event-driven-pipeline-demo-sfn-execution-role"   # replace with your stepfunctions_role_arn
  tags:
  - key: owner
    value: sfn-ack
  definition: |
      {
      "Comment": "A description of my state machine",
      "StartAt": "input-output-s3",
      "States": {
        "input-output-s3": {
          "Type": "Task",
          "Resource": "arn:aws:states:::emr-containers:startJobRun.sync",
          "Parameters": {
            "VirtualClusterId": "f0u3vt3y4q2r1ot11m7v809y6",  
            "ExecutionRoleArn": "arn:aws:iam::xxxxxxxxxxx:role/event-driven-pipeline-demo-emr-eks-data-team-a",
            "ReleaseLabel": "emr-6.7.0-latest",
            "JobDriver": {
              "SparkSubmitJobDriver": {
                "EntryPoint": "s3://sparkjob-demo-bucket/scripts/pyspark-taxi-trip.py",
                "EntryPointArguments": [
                  "s3://sparkjob-demo-bucket/input/",
                  "s3://sparkjob-demo-bucket/output/"
                ],
                "SparkSubmitParameters": "--conf spark.executor.instances=10"
              }
            },
            "ConfigurationOverrides": {
              "ApplicationConfiguration": [
                {
                 "Classification": "spark-defaults",
                "Properties": {
                  "spark.driver.cores":"1",
                  "spark.executor.cores":"1",
                  "spark.driver.memory": "10g",
                  "spark.executor.memory": "10g",
                  "spark.kubernetes.driver.podTemplateFile":"s3://sparkjob-demo-bucket/scripts/driver-pod-template.yaml",
                  "spark.kubernetes.executor.podTemplateFile":"s3://sparkjob-demo-bucket/scripts/executor-pod-template.yaml",
                  "spark.local.dir" : "/data1,/data2"
                }
              }
              ]
            }...

You can get your virtual cluster ID from the Amazon EMR console or with the following command:

kubectl get virtualcluster -o jsonpath={.items..status.id}
# result:
f0u3vt3y4q2r1ot11m7v809y6  # VirtualClusterId

Then apply the manifest to create the Step Functions state machine:

kubectl apply -f ack-yamls/sfn.yaml

Create an EventBridge rule

The last step is to create an EventBridge rule, which is used as an event broker to receive event notifications from Amazon S3. Whenever a new file, such as a new Spark script, is created in the S3 bucket, the EventBridge rule will evaluate (filter) the event and invoke the Step Functions state machine if it matches the specified rule pattern, triggering the configured Spark job.

Let’s use the following command to get the ARN of the Step Functions state machine we created earlier:

kubectl get StateMachine -o jsonpath={.items..status.ackResourceMetadata.arn}
# result
arn: arn:aws:states:us-west-2:xxxxxxxxxx:stateMachine:run-spark-job-ack # sfn_arn

Then, update eventbridge.yaml with the following values:

  • Under targets, replace the value for roleARN with eventbridge_role_arn

Under targets, replace arn with your sfn_arn

  • Optionally, in eventPattern, replace sparkjob-demo-bucket with your bucket name

See the following code:

apiVersion: eventbridge.services.k8s.aws/v1alpha1
kind: Rule
metadata:
  name: eb-rule-ack
spec:
  name: eb-rule-ack
  description: "ACK EventBridge Filter Rule to sfn using event bus reference"
  eventPattern: | 
    {
      "source": ["aws.s3"],
      "detail-type": ["Object Created"],
      "detail": {
        "bucket": {
          "name": ["sparkjob-demo-bucket"]    
        },
        "object": {
          "key": [{
            "prefix": "scripts/"
          }]
        }
      }
    }
  targets:
    - arn: arn:aws:states:us-west-2:xxxxxxxxxx:stateMachine:run-spark-job-ack # replace with your sfn arn
      id: sfn-run-spark-job-target
      roleARN: arn:aws:iam::xxxxxxxxx:role/event-driven-pipeline-demo-eb-execution-role # replace your eventbridge_role_arn
      retryPolicy:
        maximumRetryAttempts: 0 # no retries
  tags:
    - key:owner
      value: eb-ack

By applying the EventBridge configuration file, an EventBridge rule is created to monitor the folder scripts in the S3 bucket sparkjob-demo-bucket:

kubectl apply -f ack-yamls/eventbridge.yaml

For simplicity, the dead-letter queue is not set and maximum retry attempts is set to 0. For production usage, set them based on your requirements. For more information, refer to Event retry policy and using dead-letter queues.

Test the data pipeline

To test the data pipeline, we trigger it by uploading a Spark script to the S3 bucket scripts folder using the following command:

bash spark-scripts-data/upload-spark-scripts.sh

The upload event triggers the EventBridge rule and then calls the Step Functions state machine. You can go to the State machines page on the Step Functions console and choose the job run-spark-job-ack to monitor its status.

For the Spark job details, on the Amazon EMR console, choose Virtual clusters in the navigation pane, and then choose my-ack-vc. You can review all the job run history for this virtual cluster. If you choose Spark UI in any row, you’re redirected the Spark history server for more Spark driver and executor logs.

Clean up

To clean up the resources created in the post, use the following code:

aws s3 rm s3://sparkjob-demo-bucket --recursive # clean up data in S3
kubectl delete -f ack-yamls/. #Delete aws resources created by ACK
terraform destroy -target="module.eks_blueprints_kubernetes_addons" -target="module.eks_ack_addons" -auto-approve -var region=$region
terraform destroy -target="module.eks_blueprints" -auto-approve -var region=$region
terraform destroy -auto-approve -var region=$regionterraform destroy -auto-approve -var region=$region

Conclusion

This post showed how to build an event-driven data pipeline purely with native Kubernetes API and tooling. The pipeline uses EMR on EKS as compute and uses serverless AWS resources Amazon S3, EventBridge, and Step Functions as storage and orchestration in an event-driven architecture. With EventBridge, AWS and custom events can be ingested, filtered, transformed, and reliably delivered (routed) to more than 20 AWS services and public APIs (webhooks), using human-readable configuration instead of writing undifferentiated code. EventBridge helps you decouple applications and achieve more efficient organizations using event-driven architectures, and has quickly become the event bus of choice for AWS customers for many use cases, such as auditing and monitoring, application integration, and IT automation.

By using ACK controllers to create and configure different AWS services, developers can perform all data plane operations without leaving the Kubernetes platform. Also, developers only need to maintain the EKS cluster because all the other components are serverless.

As a next step, clone the GitHub repository to your local machine and test the data pipeline in your own AWS account. You can modify the code in this post and customize it for your own needs by using different EventBridge rules or adding more steps in Step Functions.


About the authors

Victor Gu is a Containers and Serverless Architect at AWS. He works with AWS customers to design microservices and cloud native solutions using Amazon EKS/ECS and AWS serverless services. His specialties are Kubernetes, Spark on Kubernetes, MLOps and DevOps.

Michael Gasch is a Senior Product Manager for AWS EventBridge, driving innovations in event-driven architectures. Prior to AWS, Michael was a Staff Engineer at the VMware Office of the CTO, working on open-source projects, such as Kubernetes and Knative, and related distributed systems research.

Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

Post Syndicated from Nith Govindasivan original https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

base_tbl.alias("employee")\
        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
    .whenNotMatchedInsert(
    values={
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"
    })\
    .execute()

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.


About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ 
    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
         { 
            “name”: “movie_id”, 
            “type”: “INTEGER” 
         },
         { 
            “name”: “title”, 
            “type”: “STRING” 
         },
         { 
            “name”: “release_year”,
            “type”: “INTEGER” 
         }
     ] 
}

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "COUNTRY",
      "type": "string"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Three ways to boost your email security and brand reputation with AWS

Post Syndicated from Michael Davie original https://aws.amazon.com/blogs/security/three-ways-to-boost-your-email-security-and-brand-reputation-with-aws/

If you own a domain that you use for email, you want to maintain the reputation and goodwill of your domain’s brand. Several industry-standard mechanisms can help prevent your domain from being used as part of a phishing attack. In this post, we’ll show you how to deploy three of these mechanisms, which visually authenticate emails sent from your domain to users and verify that emails are encrypted in transit. It can take as little as 15 minutes to deploy these mechanisms on Amazon Web Services (AWS), and the result can help to provide immediate and long-term improvements to your organization’s email security.

Phishing through email remains one of the most common ways that bad actors try to compromise computer systems. Incidents of phishing and related crimes far outnumber the incidents of other categories of internet crime, according to the most recent FBI Internet Crime Report. Phishing has consistently led to large annual financial losses in the US and globally.

Overview of BIMI, MTA-STS, and TLS reporting

An earlier post has covered how you can use Amazon Simple Email Service (Amazon SES) to send emails that align with best practices, including the IETF internet standards: Sender Policy Framework (SPF), DomainKeys Identified Mail (DKIM), and Domain-based Message Authentication, Reporting, and Conformance (DMARC). This post will show you how to build on this foundation and configure your domains to align with additional email security standards, including the following:

  • Brand Indicators for Message Identification (BIMI) – This standard allows you to associate a logo with your email domain, which some email clients will display to users in their inbox. Visit the BIMI Group’s Where is my BIMI Logo Displayed? webpage to see how logos are displayed in the user interfaces of BIMI-supporting mailbox providers; Figure 1 shows a mock-up of a typical layout that contains a logo.
  • Mail Transfer Agent Strict Transport Security (MTA-STS) – This standard helps ensure that email servers always use TLS encryption and certificate-based authentication when they send messages to your domain, to protect the confidentiality and integrity of email in transit.
  • SMTP TLS reporting – This reporting allows you to receive reports and monitor your domain’s TLS security posture, identify problems, and learn about attacks that might be occurring.
Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

These three standards require your Domain Name System (DNS) to publish specific records, for example by using Amazon Route 53, that point to web pages that have additional information. You can host this information without having to maintain a web server by storing it in Amazon Simple Storage Service (Amazon S3) and delivering it through Amazon CloudFront, secured with a certificate provisioned from AWS Certificate Manager (ACM).

Note: This AWS solution works for DKIM, BIMI, and DMARC, regardless of what you use to serve the actual email for your domains, which services you use to send email, and where you host DNS. For purposes of clarity, this post assumes that you are using Route 53 for DNS. If you use a different DNS hosting provider, you will manually configure DNS records in your existing hosting provider.

Solution architecture

The architecture for this solution is depicted in Figure 2.

Figure 2: The architecture diagram showing how the solution components interact

Figure 2: The architecture diagram showing how the solution components interact

The interaction points are as follows:

  1. The web content is stored in an S3 bucket, and CloudFront has access to this bucket through an origin access identity, a mechanism of AWS Identity and Access Management (IAM).
  2. As described in more detail in the BIMI section of this blog post, the Verified Mark Certificate is obtained from a BIMI-qualified certificate authority and stored in the S3 bucket.
  3. When an external email system receives a message claiming to be from your domain, it looks up BIMI records for your domain in DNS. As depicted in the diagram, a DNS request is sent to Route 53.
  4. To retrieve the BIMI logo image and Verified Mark Certificate, the external email system will make HTTPS requests to a URL published in the BIMI DNS record. In this solution, the URL points to the CloudFront distribution, which has a TLS certificate provisioned with ACM.

A few important warnings

Email is a complex system of interoperating technologies. It is also brittle: a typo or a missing DNS record can make the difference between whether an email is delivered or not. Pay close attention to your email server and the users of your email systems when implementing the solution in this blog post. The main indicator that something is wrong is the absence of email. Instead of seeing an error in your email server’s log, users will tell you that they’re expecting to receive an email from somewhere and it’s not arriving. Or they will tell you that they sent an email, and their recipient can’t find it.

The DNS uses a lot of caching and time-out values to improve its efficiency. That makes DNS records slow and a little unpredictable as they propagate across the internet. So keep in mind that as you monitor your systems, it can be hours or even more than a day before the DNS record changes have an effect that you can detect.

This solution uses AWS Cloud Development Kit (CDK) custom resources, which are supported by AWS Lambda functions that will be created as part of the deployment. These functions are configured to use CDK-selected runtimes, which will eventually pass out of support and require you to update them.

Prerequisites

You will need permission in an AWS account to create and configure the following resources:

  • An Amazon S3 bucket to store the files and access logs
  • A CloudFront distribution to publicly deliver the files from the S3 bucket
  • A TLS certificate in ACM
  • An origin access identity in IAM that CloudFront will use to access files in Amazon S3
  • Lambda functions, IAM roles, and IAM policies created by CDK custom resources

You might also want to enable these optional services:

  • Amazon Route 53 for setting the necessary DNS records. If your domain is hosted by another DNS provider, you will set these DNS records manually.
  • Amazon SES or an Amazon WorkMail organization with a single mailbox. You can configure either service with a subdomain (for example, [email protected]) such that the existing domain is not disrupted, or you can create new email addresses by using your existing email mailbox provider.

BIMI has some additional requirements:

  • BIMI requires an email domain to have implemented a strong DMARC policy so that recipients can be confident in the authenticity of the branded logos. Your email domain must have a DMARC policy of p=quarantine or p=reject. Additionally, the domain’s policy cannot have sp=none or pct<100.

    Note: Do not adjust the DMARC policy of your domain without careful testing, because this can disrupt mail delivery.

  • You must have your brand’s logo in Scaled Vector Graphics (SVG) format that conforms to the BIMI standard. For more information, see Creating BIMI SVG Logo Files on the BIMI Group website.
  • Purchase a Verified Mark Certificate (VMC) issued by a third-party certificate authority. This certificate attests that the logo, organization, and domain are associated with each other, based on a legal trademark registration. Many email hosting providers require this additional certificate before they will show your branded logo to their users. Others do not currently support BIMI, and others might have alternative mechanisms to determine whether to show your logo. For more information about purchasing a Verified Mark Certificate, see the BIMI Group website.

    Note: If you are not ready to purchase a VMC, you can deploy this solution and validate that BIMI is correctly configured for your domain, but your branded logo will not display to recipients at major email providers.

What gets deployed in this solution?

This solution deploys the DNS records and supporting files that are required to implement BIMI, MTA-STS, and SMTP TLS reporting for an email domain. We’ll look at the deployment in more detail in the following sections.

BIMI

BIMI is described by the Internet Engineering Task Force (IETF) as follows:

Brand Indicators for Message Identification (BIMI) permits Domain Owners to coordinate with Mail User Agents (MUAs) to display brand-specific Indicators next to properly authenticated messages. There are two aspects of BIMI coordination: a scalable mechanism for Domain Owners to publish their desired Indicators, and a mechanism for Mail Transfer Agents (MTAs) to verify the authenticity of the Indicator. This document specifies how Domain Owners communicate their desired Indicators through the BIMI Assertion Record in DNS and how that record is to be interpreted by MTAs and MUAs. MUAs and mail-receiving organizations are free to define their own policies for making use of BIMI data and for Indicator display as they see fit.

If your organization has a trademark-protected logo, you can set up BIMI to have that logo displayed to recipients in their email inboxes. This can have a positive impact on your brand and indicates to end users that your email is more trustworthy. The BIMI Group shows examples of how brand logos are displayed in user inboxes, as well as a list of known email service providers that support the display of BIMI logos.

As a domain owner, you can implement BIMI by publishing the relevant DNS records and hosting the relevant files. To have your logo displayed by most email hosting providers, you will need to purchase a Verified Mark Certificate from a BIMI-qualified certificate authority.

This solution will deploy a valid BIMI record in Route 53 (or tell you what to publish in the DNS if you’re not using Route 53) and will store your provided SVG logo and Verified Mark Certificate files in Amazon S3, to be delivered through CloudFront with a valid TLS certificate from ACM.

To support BIMI, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host:
    default._bimi.<your-domain>. The value of this record is: v=BIMI1; l=<url-of-your-logo> a=<url-of-verified-mark-certificate>. The value of <your-domain> refers to the domain that is used in the From header of messages that your organization sends.
  2. The logo and optional Verified Mark Certificate are hosted publicly at the HTTPS locations defined by <url-of-your-logo> and <url-of-verified-mark-certificate>, respectively.

MTA-STS

MTA-STS is described by the IETF in RFC 8461 as follows:

SMTP (Simple Mail Transport Protocol) MTA Strict Transport Security (MTA-STS) is a mechanism enabling mail service providers to declare their ability to receive Transport Layer Security (TLS) secure SMTP connections and to specify whether sending SMTP servers should refuse to deliver to MX hosts that do not offer TLS with a trusted server certificate.

Put simply, MTA-STS helps ensure that email servers always use encryption and certificate-based authentication when sending email to your domains, so that message integrity and confidentiality are preserved while in transit across the internet. MTA-STS also helps to ensure that messages are only sent to authorized servers.

This solution will deploy a valid MTA-STS policy record in Route 53 (or tell you what value to publish in the DNS if you’re not using Route 53) and will create an MTA-STS policy document to be hosted on S3 and delivered through CloudFront with a valid TLS certificate from ACM.

To support MTA-STS, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _mta-sts.<your-domain>. The value of this record is: v=STSv1; id=<unique value used for cache invalidation>.
  2. The MTA-STS policy document is hosted at and obtained from the following location: https://mta-sts.<your-domain>/.well-known/mta-sts.txt.
  3. The value of <your-domain> in both cases is the domain that is used for routing inbound mail to your organization and is typically the same domain that is used in the From header of messages that your organization sends externally. Depending on the complexity of your organization, you might receive inbound mail for multiple domains, and you might choose to publish MTA-STS policies for each domain.

Is it ever bad to encrypt everything?

In the example MTA-STS policy file provided in the GitHub repository and explained later in this post, the MTA-STS policy mode is set to testing. This means that your email server is advertising its willingness to negotiate encrypted email connections, but it does not require TLS. Servers that want to send mail to you are allowed to connect and deliver mail even if there are problems in the TLS connection, as long as you’re in testing mode. You should expect reports when servers try to connect through TLS to your mail server and fail to do so.

Be fully prepared before you change the MTA-STS policy to enforce. After this policy is set to enforce, servers that follow the MTA-STS policy and that experience an enforceable TLS-related error when they try to connect to your mail server will not deliver mail to your mail server. This is a difficult situation to detect. You will simply stop receiving email from servers that comply with the policy. You might receive reports from them indicating what errors they encountered, but it is not guaranteed. Be sure that the email address you provide in SMTP TLS reporting (in the following section) is functional and monitored by people who can take action to fix issues. If you miss TLS failure reports, you probably won’t receive email. If the TLS certificate that you use on your email server expires, and your MTA-STS policy is set to enforce, this will become an urgent issue and will disrupt the flow of email until it is fixed.

SMTP TLS reporting

SMTP TLS reporting is described by the IETF in RFC 8460 as follows:

A number of protocols exist for establishing encrypted channels between SMTP Mail Transfer Agents (MTAs), including STARTTLS, DNS-Based Authentication of Named Entities (DANE) TLSA, and MTA Strict Transport Security (MTA-STS). These protocols can fail due to misconfiguration or active attack, leading to undelivered messages or delivery over unencrypted or unauthenticated channels. This document describes a reporting mechanism and format by which sending systems can share statistics and specific information about potential failures with recipient domains. Recipient domains can then use this information to both detect potential attacks and diagnose unintentional misconfigurations.

As you gain the security benefits of MTA-STS, SMTP TLS reporting will allow you to receive reports from other internet email providers. These reports contain information that is valuable when monitoring your TLS security posture, identifying problems, and learning about attacks that might be occurring.

This solution will deploy a valid SMTP TLS reporting record on Route 53 (or provide you with the value to publish in the DNS if you are not using Route 53).

To support SMTP TLS reporting, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _smtp._tls.<your-domain>. The value of this record is: v=TLSRPTv1; rua=mailto:<report-receiver-email-address>
  2. The value of <report-receiver-email-address> might be an address in your domain or in a third-party provider. Automated systems that process these reports must be capable of processing GZIP compressed files and parsing JSON.

Deploy the solution with the AWS CDK

In this section, you’ll learn how to deploy the solution to create the previously described AWS resources in your account.

  1. Clone the following GitHub repository:

    git clone https://github.com/aws-samples/serverless-mail
    cd serverless-mail/email-security-records

  2. Edit CONFIG.py to reflect your desired settings, as follows:
    1. If no Verified Mark Certificate is provided, set VMC_FILENAME = None.
    2. If your DNS zone is not hosted on Route 53, or if you do not want this app to manage Route 53 DNS records, set ROUTE_53_HOSTED = False. In this case, you will need to set TLS_CERTIFICATE_ARN to the Amazon Resource Name (ARN) of a certificate hosted on ACM in us-east-1. This certificate is used by CloudFront and must support two subdomains: mta-sts and your configured BIMI_ASSET_SUBDOMAIN.
  3. Finalize the preparation, as follows:
    1. Place your BIMI logo and Verified Mark Certificate files in the assets folder.
    2. Create an MTA-STS policy file at assets/.well-known/mta-sts.txt to reflect your mail exchange (MX) servers and policy requirements. An example file is provided at assets/.well-known/mta-sts.txt.example
  4. Deploy the solution, as follows:
    1. Open a terminal in the email-security-records folder.
    2. (Recommended) Create and activate a virtual environment by running the following commands.
      python3 -m venv .venv
      source .venv/bin/activate
    3. Install the Python requirements in your environment with the following command.
      pip install -r requirements.txt
    4. Assume a role in the target account that has the permissions outlined in the Prerequisites section of this post.

      Using AWS CDK version 2.17.0 or later, deploy the bootstrap in the target account by running the following command. To learn more, see Bootstrapping in the AWS CDK Developer Guide.
      cdk bootstrap

    5. Run the following command to synthesize the CloudFormation template. Review the output of this command to verify what will be deployed.
      cdk synth
    6. Run the following command to deploy the CloudFormation template. You will be prompted to accept the IAM changes that will be applied to your account.
      cdk deploy

      Note: If you use Route53, these records are created and activated in your DNS zones as soon as the CDK finishes deploying. As the records propagate through the DNS, they will gradually start affecting the email in the affected domains.

    7. If you’re not using Route53 and instead are using a third-party DNS provider, create the CNAME and TXT records as indicated. In this case, your email is not affected by this solution until you create the records in DNS.

Testing and troubleshooting

After you have deployed the CDK solution, you can test it to confirm that the DNS records and web resources are published correctly.

BIMI

  1. Query the BIMI DNS TXT record for your domain by using the dig or nslookup command in your terminal.

    dig +short TXT default._bimi.<your-domain.example>

    Verify the response. For example:

    "v=BIMI1; l=https://bimi-assets.<your-domain.example>/logo.svg"

  2. In your web browser, open the URL from that response (for example, https://bimi-assets.<your-domain.example>/logo.svg) to verify that the logo is available and that the HTTPS certificate is valid.
  3. The BIMI group provides a tool to validate your BIMI configuration. This tool will also validate your VMC if you have purchased one.

MTA-STS

  1. Query the MTA-STS DNS TXT record for your domain.

    dig +short TXT _mta-sts.<your-domain.example>

    The value of this record is as follows:

    v=STSv1; id=<unique value used for cache invalidation>

  2. You can load the MTA-STS policy document using your web browser. For example, https://mta-sts.<your-domain.example>/.well-known/mta-sts.txt
  3. You can also use third party tools to examine your MTA-STS configuration, such as MX Toolbox.

TLS reporting

  1. Query the TLS reporting DNS TXT record for your domain.

    dig +short TXT _smtp._tls.<your-domain.example>

    Verify the response. For example:

    "v=TLSRPTv1; rua=mailto:<your email address>"

  2. You can also use third party tools to examine your TLS reporting configuration, such as Easy DMARC.

Depending on which domains you communicate with on the internet, you will begin to see TLS reports arriving at the email address that you have defined in the TLS reporting DNS record. We recommend that you closely examine the TLS reports, and use automated analytical techniques over an extended period of time before changing the default testing value of your domain’s MTA-STS policy. Not every email provider will send TLS reports, but examining the reports in aggregate will give you a good perspective for making changes to your MTA-STS policy.

Cleanup

To remove the resources created by this solution:

  1. Open a terminal in the cdk-email-security-records folder.
  2. Assume a role in the target account with permission to delete resources.
  3. Run cdk destroy.

Note: The asset and log buckets are automatically emptied and deleted by the cdk destroy command.

Conclusion

When external systems send email to or receive email from your domains they will now query your new DNS records and will look up your domain’s BIMI, MTA-STS, and TLS reporting information from your new CloudFront distribution. By adopting the email domain security mechanisms outlined in this post, you can improve the overall security posture of your email environment, as well as the perception of your brand.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Michael Davie

Michael Davie

Michael is a Senior Industry Specialist with AWS Security Assurance. He works with our customers, their regulators, and AWS teams to help raise the bar on secure cloud adoption and usage. Michael has over 20 years of experience working in the defence, intelligence, and technology sectors in Canada and is a licensed professional engineer.

Jesse Thompson

Jesse Thompson

Jesse is an Email Deliverability Manager with the Amazon Simple Email Service team. His background is in enterprise IT development and operations, with a focus on email abuse mitigation and encouragement of authenticity practices with open standard protocols. Jesse’s favorite activity outside of technology is recreational curling.

Control access to Amazon OpenSearch Service Dashboards with attribute-based role mappings

Post Syndicated from Stefan Appel original https://aws.amazon.com/blogs/big-data/control-access-to-amazon-opensearch-service-dashboards-with-attribute-based-role-mappings/

Federated users of Amazon OpenSearch Service often need access to OpenSearch Dashboards with roles based on their user profiles. OpenSearch Service fine-grained access control maps authenticated users to OpenSearch Search roles and then evaluates permissions to determine how to handle the user’s actions. However, when an enterprise-wide identity provider (IdP) manages the users, the mapping of users to OpenSearch Service roles often needs to happen dynamically based on IdP user attributes. One option to map users is to use OpenSearch Service SAML integration and pass user group information to OpenSearch Service. Another option is Amazon Cognito role-based access control, which supports rule-based or token-based mappings. But neither approach supports arbitrary role mapping logic. For example, when you need to interpret multivalued user attributes to identify a target role.

This post shows how you can implement custom role mappings with an Amazon Cognito pre-token generation AWS Lambda trigger. For our example, we use a multivalued attribute provided over OpenID Connect (OIDC) to Amazon Cognito. We show how you are in full control of the mapping logic and process of such a multivalued attribute for AWS Identity and Access Management (IAM) role lookups. Our approach is generic for OIDC-compatible IdPs. To make this post self-contained, we use the Okta IdP as an example to walk through the setup.

Overview of solution

The provided solution intercepts the OICD-based login process to OpenSearch Dashboards with a pre-token generation Lambda function. The login to OpenSearch Dashboards with a third-party IdP and Amazon Cognito as an intermediary consists of several steps:

  1. First, the initial user request to OpenSearch Dashboard is redirected to Amazon Cognito.
  2. Amazon Cognito redirects the request to the IdP for authentication.
  3. After the user authenticates, the IdP sends the identity token (ID token) back to Amazon Cognito.
  4. Amazon Cognito invokes a Lambda function that modifies the obtained token. We use an Amazon DynamoDB table to perform role mapping lookups. The modified token now contains the IAM role mapping information.
  5. Amazon Cognito uses this role mapping information to map the user to the specified IAM role and provides the role credentials.
  6. OpenSearch Service maps the IAM role credentials to OpenSearch roles and applies fine-grained permission checks.

The following architecture outlines the login flow from a user’s perspective.

Scope of solution

On the backend, OpenSearch Dashboards integrates with an Amazon Cognito user pool and an Amazon Cognito identity pool during the authentication flow. The steps are as follows:

  1. Authenticate and get tokens.
  2. Look up the token attribute and IAM role mapping and overwrite the Amazon Cognito attribute.
  3. Exchange tokens for AWS credentials used by OpenSearch dashboards.

The following architecture shows this backend perspective to the authentication process.

Backend authentication flow

In the remainder of this post, we walk through the configurations necessary for an authentication flow in which a Lambda function implements custom role mapping logic. We provide sample Lambda code for the mapping of multivalued OIDC attributes to IAM roles based on a DynamoDB lookup table with the following structure.

OIDC Attribute Value IAM Role
["attribute_a","attribute_b"] arn:aws:iam::<aws-account-id>:role/<role-name-01>
["attribute_a","attribute_x"] arn:aws:iam::<aws-account-id>:role/<role-name-02>

The high-level steps of the solution presented in this post are as follows:

  1. Configure Amazon Cognito authentication for OpenSearch Dashboards.
  2. Add IAM roles for mappings to OpenSearch Service roles.
  3. Configure the Okta IdP.
  4. Add a third-party OIDC IdP to the Amazon Cognito user pool.
  5. Map IAM roles to OpenSearch Service roles.
  6. Create the DynamoDB attribute-role mapping table.
  7. Deploy and configure the pre-token generation Lambda function.
  8. Configure the pre-token generation Lambda trigger.
  9. Test the login to OpenSearch Dashboards.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account with an OpenSearch Service domain.
  • A third-party IdP that supports OpenID Connect and adds a multivalued attribute in the authorization token. For this post, we use attributes_array as this attribute’s name and Okta as an IdP provider. You can create an Okta Developer Edition free account to test the setup.

Configure Amazon Cognito authentication for OpenSearch Dashboards

The modification of authentication tokens requires you to configure the OpenSearch Service domain to use Amazon Cognito for authentication. For instructions, refer to Configuring Amazon Cognito authentication for OpenSearch Dashboards.

The Lambda function implements custom role mappings by setting the cognito:preferred_role claim (for more information, refer to Role-based access control). For the correct interpretation of this claim, set the Amazon Cognito identity pool to Choose role from token. The Amazon Cognito identity pool then uses the value of the cognito:preferred_role claim to select the correct IAM role. The following screenshot shows the required settings in the Amazon Cognito identity pool that is created during the configuration of Amazon Cognito authentication for OpenSearch Service.

Cognito role mapping configuration

Add IAM roles for mappings to OpenSearch roles

IAM roles used for mappings to OpenSearch roles require a trust policy so that authenticated users can assume them. The trust policy needs to reference the Amazon Cognito identity pool created during the configuration of Amazon Cognito authentication for OpenSearch Service. Create at least one IAM role with a custom trust policy. For instructions, refer to Creating a role using custom trust policies. The IAM role doesn’t require the attachment of a permission policy. For a sample trust policy, refer to Role-based access control.

Configure the Okta IdP

In this section, we describe the configuration steps to include a multivalued attribute_array attribute in the token provided by Okta. For more information, refer to Customize tokens returned from Okta with custom claims. We use the Okta UI to perform the configurations. Okta also provides an API that you can use to script and automate the setup.

The first step is adding the attributes_array attribute to the Okta user profile.

  1. Use Okta’s Profile Editor under Directory, Profile Editor.
  2. Select User (default) and then choose Add Attribute.
  3. Add an attribute with a display name and variable name attributes_array of type string array.

The following screenshot shows the Okta default user profile after the custom attribute has been added.

Okta user profile editor

  1. Next, add attributes_array attribute values to users using Okta’s user management interface under Directory, People.
  2. Select a user and choose Profile.
  3. Choose Edit and enter attribute values.

The following screenshot shows an example of attributes_array attribute values within a user profile.

Okta user attributes array

The next step is adding the attributes_array attribute to the ID token that is generated during the authentication process.

  1. On the Okta console, choose Security, API and select the default authorization server.
  2. Choose Claims and choose Add Claim to add the attributes_array attribute as part of the ID token.
  3. As the scope, enter openid and as the attribute value, enter user.attributes_array.

This references the previously created attribute in a user’s profile.

Add claim to ID token

  1. Next, create an application for the federation with Amazon Cognito. For instructions, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

The last step assigns the Okta application to Okta users.

  1. Navigate to Directory, People, select a user, and choose Assign Applications.
  2. Select the application you created in the previous step.

Add a third-party OIDC IdP to the Amazon Cognito user pool

We are implementing the role mapping based on the information provided in a multivalued OIDC attribute. The authentication token needs to include this attribute. If you followed the previously described Okta configuration, the attribute is automatically added to the ID token of a user. If you used another IdP, you might have to request the attribute explicitly. For this, add the attribute name to the Authorized scopes list of the IdP in Amazon Cognito.

For instructions on how to set up the federation between a third-party IdP and an Amazon Cognito user pool and how to request additional attributes, refer to Adding OIDC identity providers to a user pool. For a detailed walkthrough for Okta, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

After requesting the token via OIDC, you need to map the attribute to an Amazon Cognito user pool attribute. For instructions, refer to Specifying identity provider attribute mappings for your user pool. The following screenshot shows the resulting configuration on the Amazon Cognito console.

Amazon Cognito user pool attribute mapping

Map IAM roles to OpenSearch Service roles

Upon login, OpenSearch Service maps users to an OpenSearch Service role based on the IAM role ARN set in the cognito:preferred_role claim by the pre-token generation Lambda trigger. This requires a role mapping in OpenSearch Service. To add such role mappings to IAM backend roles, refer to Mapping roles to users. The following screenshot shows a role mapping on the OpenSearch Dashboards console.

Amazon OpenSearch Service role mappings

Create the attribute-role mapping table

For this solution, we use DynamoDB to store mappings of users to IAM roles. For instructions, refer to Create a table and define a partition key named Key of type String. You need the table name in the subsequent step to configure the Lambda function.

The next step is writing the mapping information into the table. A mapping entry consists of the following attributes:

  • Key – A string that contains attribute values in comma-separated alphabetical order
  • RoleArn – A string with the IAM role ARN to which the attribute value combination should be mapped

For details on how to add data to a DynamoDB table, refer to Write data to a table using the console or AWS CLI.

For example, if the previously configured OIDC attribute attributes_array contains three values, attribute_a, attribute_b, and attribute_c, the entry in the mapping table looks like table line 1 in the following screenshot.

Amazon DynamoDB table with attribute-role mappings

Deploy and configure the pre-token generation Lambda function

A Lambda function implements the custom role mapping logic. The Lambda function receives an Amazon Cognito event as input and extracts attribute information out of it. It uses the attribute information for a lookup in a DynamoDB table and retrieves the value for cognito:preferred_role. Follow the steps in Getting started with Lambda to create a Node.js Lambda function and insert the following source code:

const AWS = require("aws-sdk");
const tableName = process.env.TABLE_NAME;
const unauthorizedRoleArn = process.env.UNAUTHORIZED_ROLE;
const userAttributeArrayName = process.env.USER_POOL_ATTRIBUTE;
const dynamodbClient = new AWS.DynamoDB({apiVersion: "2012-08-10"});
exports.lambdaHandler = handlePreTokenGenerationEvent

async function handlePreTokenGenerationEvent (event, context) {
    var sortedAttributeList = getSortedAttributeList(event);
    var lookupKey = sortedAttributeList.join(',');
    var roleArn = await lookupIAMRoleArn(lookupKey);
    appendResponseWithPreferredRole(event, roleArn);
    return event;
}

function getSortedAttributeList(event) {
    return JSON.parse(event['request']['userAttributes'][userAttributeArrayName]).sort();
}

async function lookupIAMRoleArn(key) {
    var params = {
        TableName: tableName,
        Key: {
          'Key': {S: key}
        },
        ProjectionExpression: 'RoleArn'
      };
    try {
        let item = await dynamodbClient.getItem(params).promise();
        return item['Item']['RoleArn']['S'];
    } catch (e){
        console.log(e);
        return unauthorizedRoleArn; 
    }
}

function appendResponseWithPreferredRole(event, roleArn){
    event.response = {
        'claimsOverrideDetails': {
            'groupOverrideDetails': {
                'preferredRole': roleArn
            }
        }
    };
}

The Lambda function expects three environment variables. Refer to Using AWS Lambda environment variables for instructions to add the following entries:

  • TABLE_NAME – The name of the previously created DynamoDB table. This table is used for the lookups.
  • UNAUTHORIZED_ROLE – The ARN of the IAM role that is used when no mapping is found in the lookup table.
  • USER_POOL_ATTRIBUTE – The Amazon Cognito user pool attribute used for the IAM role lookup. In our example, this attribute is named custom:attributes_array.

The following screenshot shows the final configuration.

AWS Lamba function configuration

The Lambda function needs permissions to access the DynamoDB lookup table. Set permissions as follows: attach the following policy to the Lambda execution role (for instructions, refer to Lambda execution role) and provide the Region, AWS account number, and DynamoDB table name:

{
    "Statement": [
        {
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:Scan",
                "dynamodb:Query",
                "dynamodb:BatchGetItem",
                "dynamodb:DescribeTable"
            ],
            "Resource": [
                "arn:aws:dynamodb:<region>:<accountid>:table/<table>",
                "arn:aws:dynamodb:<region>:<accountid>:table/<table>/index/*"
            ],
            "Effect": "Allow"
        }
    ]
}

The configuration of the Lambda function is now complete.

Configure the pre-token generation Lambda trigger

As final step, add a pre-token generation trigger to the Amazon Cognito user pool and reference the newly created Lambda function. For details, refer to Customizing user pool workflows with Lambda triggers. The following screenshot shows the configuration.

Amazon Cognito pre-token generation trigger configuration

This step completes the setup; Amazon Cognito now maps users to OpenSearch Service roles based on the values provided in an OIDC attribute.

Test the login to OpenSearch Dashboards

The following diagram shows an exemplary login flow and the corresponding screenshots for an Okta user user1 with a user profile attribute attribute_array and value: ["attribute_a", "attribute_b", "attribute_c"].

Testing of solution

Clean up

To avoid incurring future charges, delete the OpenSearch Service domain, Amazon Cognito user pool and identity pool, Lambda function, and DynamoDB table created as part of this post.

Conclusion

In this post, we demonstrated how to set up a custom mapping to OpenSearch Service roles using values provided via an OIDC attribute. We dynamically set the cognito:preferred_role claim using an Amazon Cognito pre-token generation Lambda trigger and a DynamoDB table for lookup. The solution is capable of handling dynamic multivalued user attributes, but you can extend it with further application logic that goes beyond a simple lookup. The steps in this post are a proof of concept. If you plan to develop this into a productive solution, we recommend implementing Okta and AWS security best practices.

The post highlights just one use case of how you can use Amazon Cognito support for Lambda triggers to implement custom authentication needs. If you’re interested in further details, refer to How to Use Cognito Pre-Token Generation trigger to Customize Claims In ID Tokens.


About the Authors

Portrait StefanStefan Appel is a Senior Solutions Architect at AWS. For 10+ years, he supports enterprise customers adopt cloud technologies. Before joining AWS, Stefan held positions in software architecture, product management, and IT operations departments. He began his career in research on event-based systems. In his spare time, he enjoys hiking and has walked the length of New Zealand following Te Araroa.

Portrait ModoodModood Alvi is Senior Solutions Architect at Amazon Web Services (AWS). Modood is passionate about digital transformation and is committed helping large enterprise customers across the globe accelerate their adoption of and migration to the cloud. Modood brings more than a decade of experience in software development, having held various technical roles within companies like SAP and Porsche Digital. Modood earned his Diploma in Computer Science from the University of Stuttgart.

Build highly available streams with Amazon Kinesis Data Streams

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/build-highly-available-streams-with-amazon-kinesis-data-streams/

Many use cases are moving towards a real-time data strategy due to demand for real-time insights, low-latency response times, and the ability to adapt to the changing needs of end-users. For this type of workload, you can use Amazon Kinesis Data Streams to seamlessly provision, store, write, and read data in a streaming fashion. With Kinesis Data Streams, there are no servers to manage, and you can scale your stream to handle any additional throughput as it comes in.

Kinesis Data Streams offers 99.9% availability in a single AWS Region. For even higher availability, there are several strategies to explore within the streaming layer. This post compares and contrasts different strategies for creating a highly available Kinesis data stream in case of service interruptions, delays, or outages in the primary Region of operation.

Considerations for high availability

Before we dive into our example use case, there are several considerations to keep in mind when designing a highly available Kinesis Data Streams workload that relate to the business need for a particular pipeline:

  • Recovery Time Objective (RTO) is defined by the organization. RTO is the maximum acceptable delay between the interruption of service and restoration of service. This determines what is considered an acceptable time window when service is unavailable.
  • Recovery Point Objective (RPO) is defined by the organization. RPO is the maximum acceptable amount of time since the last data recovery point. This determines what is considered an acceptable loss of data between the last recovery point and the interruption of service.

In a general sense, the lower your values for RPO and RTO, the more expensive the overall solution becomes. This is because the solution needs to account for minimizing both data loss and service unavailability by having multiple instances of the service up and running in multiple Regions. This is why a big piece of high availability is the replication of data flowing through a workload. In our case, the data is replicated across Regions of Kinesis Data Streams. Conversely, the higher the RPO and RTO values are, the more complexity you introduce into your failover mechanism. This is due to the fact that the cost savings you realize by not standing up multiple instances across multiple Regions are offset by the orchestration needed to spin up these instances in the event of an outage.

In this post, we are only covering failover of a Kinesis data stream. In use cases where higher availability is required across the entire data pipeline, having failover architectures for every component (Amazon API Gateway, AWS Lambda, Amazon DynamoDB) is strongly encouraged.

The simplest approach to high availability is to start a new instance of producers, consumers, and data streams in a new Region upon service unavailability detection. The benefit here is primarily cost, but your RPO and RTO values will be higher as a result.

We cover the following strategies for highly available Kinesis Data Streams:

  • Warm standy – An architecture in which there is active replication of data from the Kinesis data stream in Region A to Region B. Consumers of the data stream are running in both Regions at all times. Recommended for use cases that can’t withstand extended downtime past their replication lag.
  • Cold standby – Active replication of data from the data stream in Region A to Region B, but consumers of the data stream in Region B are spun up when an outage in Region A is detected. Recommended for use cases that can afford some downtime as infrastructure is spun up in secondary Region. In this scenario, RPO will be similar to the warm standby strategy; however, RTO will increase.

For high availability purposes, these use cases need to replicate the data across Regions in a way that allows consumers and producers of the data stream to fail over quickly upon detection of a service unavailability and utilize the secondary Region’s stream. Let’s take an example architecture to further explain these DR strategies. We use API Gateway and Lambda to publish stock ticker information to a Kinesis data stream. The data is then retrieved by another Lambda consumer to save durably into DynamoDB for querying, alerting, and reporting. The following diagram illustrates this architecture.

the primary architecture for the post--showcasing data coming from a mobile phone to API Gateway, then AWS Lambda, then Kinesis Data Streams, Lambda again and finally publishing to a DynamoDB Table

We use this architecture with an example use case requiring the streaming workload to be highly available in the event of a Region outage. The customer can withstand an RTO of 15 minutes during an outage, because they refresh end-users’ dashboards on a 15-minute interval. The customer is sensitive to downtime and data loss, because their data will be used for historical auditing purposes, operational metrics, and dashboards for end-users. Downtime for this customer means that data isn’t able to be persisted in their database from their streaming layer, and therefore unavailable to any consuming application. For this use case, data can be retried up to 5 minutes from our Lambda function before failing over to the new Region. Consumers are considered unavailable when the stream is unavailable, and can scale up in the secondary Region to account for any backlog of events.

How might we approach making a Kinesis data stream highly available for this use case?

Warm standby pattern

The following architecture diagram illustrates the warm standby high availability pattern for Kinesis Data Streams.

warm standby pattern showcasing data being replicated between a kinesis data stream in one region to another

image showcasing the warm standby failover--where data from first lambda begins replicating to secondary region KDA

The warm standby architectural pattern involves running a Kinesis data stream both in the primary and secondary Region, along with consumers and downstream destinations of the primary Region’s streaming layer being replicated as well. Sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. We dive into details of how to achieve this in the client failover section of this post. Data is replicated across Regions from the data stream in the primary Region to the secondary Region. This is done instead of having the sources publish to both Regions to avoid any consistency issues between the streams in the two Regions.

Although this architectural pattern gives very high availability, it’s also the most expensive option because we’re duplicating virtually the entire streaming layer across two Regions. For business use cases that can’t withstand extended data loss or withstand downtime, this may be the best option for them. From an RTO perspective, this architectural pattern ensures there will be no downtime. There is some nuance in the RPO metric in that it depends heavily on the replication lag. In the event of the primary stream becoming unavailable, whatever data hasn’t yet been replicated may be unavailable in the secondary Region. This data won’t be considered lost, but may be unavailable for consumption until the primary stream becomes available again. This method also can result in events being out of order.

For business needs that can’t tolerate this level of record unavailability, consider retaining data on the producer for the purposes of publishing to an available stream when available, or rewinding against the source for the producer if possible so that data stuck in the primary Region can be resent to the secondary stream upon failover. We cover this consideration in the client failover section of this post.

Cold standby pattern

The following architecture diagram illustrates the cold standby high availability pattern for Kinesis Data Streams.

active passive pattern for kinesis data streams

The cold standby architectural pattern involves running a data stream both in the primary and secondary Region, and spinning up the downstream resources like a stream consumer and destination for streams when a service interruption is detected—passive mode. Just like the warm standby pattern, sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. Likewise, data is replicated across Regions from the data stream in the primary Region to the secondary Region.

The primary benefit this architectural pattern provides is cost efficiency. By not running consumers at all times, this effectively reduces your costs significantly compared to the warm standby pattern. However, this pattern may introduce some data unavailability for downstream systems while the secondary Region infrastructure is provisioned. Additionally, depending on replication lag, some records may be unavailable, as discussed in the warm standby pattern. It should be noted that depending on how long it takes to spin up resources, it may take some time for consumers to reprocess the data in the secondary Region, and latency can be introduced when failing over. Our implementation assumes a minimal replication lag and that downstream systems have the ability to reprocess a configurable amount of data to catch up to the tip of the stream. We discuss approaches to spinning these resources up in the client failover section, but one possible approach to this would be using an AWS CloudFormation template that spins these resources up on service unavailability detection.

For business needs that can tolerate some level of data unavailability and can accept interruptions while the new infrastructure in the secondary Region is spun up, this is an option to consider both from a cost perspective and an RPO/RTO perspective. The complexity of spinning up resources upon detecting service unavailability is offset by the lower cost of the overall solution.

Which pattern makes sense for our use case?

Let’s revisit the use case described earlier to identify which of the strategies best meets our needs. We can extract the pieces of information from the customer’s problem statement to identify that they need a high availability architecture that:

  • Can’t withstand extended amounts of data loss
  • Must resume operations within 15 minutes of service interruption identification

This criterion tells us that their RPO is close to zero, and their RTO is 15 minutes. From here, we can determine that the cold standby architecture with data replication provides us limited data loss, and the maximum downtime will be determined by the time it takes to provision consumers and downstream destinations in the secondary Region.

Let’s dive deeper into the implementation details of each of the core phases of high availability, including an implementation guide for our use case.

Launch AWS CloudFormation resources

If you want to follow along with our code samples, you can launch the following CloudFormation stack and follow the instructions in order to simulate the cold standby architecture referenced in this post.

Launch Stack

For purposes of the Kinesis Data Streams high availability setup demo, we use us-west-2 as the primary Region and us-east-2 as the failover Region. While deploying this solution in your own account, you can choose your own primary and failover Regions.

  1. Deploy the supplied CloudFormation template in failover Region us-east-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

  1. Deploy the supplied CloudFormation template in primary Region us-west-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

In steps 1 and 2, we deployed the following resources in the primary and failover Regions:

  1. KDS-HA-Stream – AWS::Kinesis::Stream (primary and failover Region)
  2. KDS-HA-ProducerLambda – AWS::Lambda::Function (primary Region)
  3. KDS-HA-ConsumerLambda – AWS::Lambda::Function (primary and failover Region)
  4. KDS-HA-ReplicationAgentLambda – AWS::Lambda::Function (primary Region)
  5. KDS-HA-FailoverLambda – AWS::Lambda::Function (primary Region)
  6. ticker-prices – AWS::DynamoDB::GlobalTable (primary and failover Region)

The KDS-HA-Stream Kinesis data stream is deployed in both Regions. An enhanced fan-out consumer of the KDS-HA-Stream stream KDS-HA-ReplicationAgentLambda in the primary Region is responsible for replicating messages to the data stream in the failover Region.

KDS-HA-ConsumerLambda is a Lambda function consuming messages out of the KDS-HA-Stream stream and persisting data into a DynamoDB table after preprocessing.

You can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that last_updated_region attribute shows us-west-2 as its value because it’s the primary Region.

Replication

When deciding how to replicate data from a data stream in Region A to a data stream in Region B, there are several strategies that involve a consumer reading data off of the primary stream and sending that data cross-Region to the secondary data stream. This would act as a replicator service, responsible for copying the data between the two streams, maintaining a relatively low latency to replicate and ensuring data isn’t lost during this replication.

Because replication off of a shared throughput data stream could impact the flow of data in a production workload, we recommend using the enhanced fan-out feature of Kinesis Data Streams consumers to ensure replication doesn’t have an impact on consumption latency.

The replication strategy implemented in this post features asynchronous replication, meaning that the replication process doesn’t block any standard data flow in the primary stream. Synchronous replication would be a safer approach to guarantee replication and avoid data loss; however, this isn’t possible without a service-side implementation.

The following image shows a timeline of data flow for the cold standby architecture, with data being replicated as soon as it’s published.

Lambda replication

Lambda can treat a Kinesis data stream as an event source, which will funnel events from your data stream into a Lambda function. This Lambda function then receives and forwards these events across Regions to your data stream in a secondary Region. Lambda functions allow you to utilize best streaming practices such as retries of records that encounter errors, bisect on error functionality, and using the Lambda parallelization factor; using more instances of your Lambda function than you have available shards can help process records faster.

This Lambda function is at the crux of the architecture for high availability; it’s responsible solely for sending data across Regions, and it also has the best capability to monitor the replication progress. Important metrics to monitor for Lambda replication include IteratorAge, which indicates how old the last record in the batch was when it finished processing. A high IteratorAge value indicates that the Lambda function is falling behind and therefore is not keeping up with data ingestion for replication purposes. A high IteratorAge can lead to a higher RPO and the higher likelihood of data unavailability when a passive failover happens.

We use the following sample Lambda function in our CloudFormation template to replicate data across Regions:

import json
import boto3
import random
import os
import base64


def lambda_handler(event, context):
    client = boto3.client("kinesis", region_name=os.environ["FAILOVER_REGION"])
    records = []

    for record in event["Records"]:
        records.append(
            {
                "PartitionKey": record["kinesis"]["partitionKey"],
                "Data": base64.b64decode(record["kinesis"]["data"]).decode("utf-8"),
            }
        )
    response = client.put_records(Records=records, StreamName="KDS-HA-Stream")
    if response["FailedRecordCount"] > 0:
        print("Failed replicating data: " + json.dumps(response))
        raise Exception("Failed replicating data!")

The Lambda replicator in the CloudFormation template is configured to read from the data stream in the primary Region.

The following code contains the necessary AWS Identity and Access Management (IAM) permissions for Lambda, giving access for the Lambda function to assume this role. All actions are permitted on data streams and DynamoDB. In the principal of least privilege, it’s recommended to restrict this to the necessary streams in a production environment.

      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:DescribeStreamSummary'
                  - 'kinesis:GetRecords'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:ListShards'
                  - 'kinesis:ListStreams'
                  - 'kinesis:SubscribeToShard'
                  - 'kinesis:PutRecords'
                Resource:
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream'
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream/consumer/KDS-HA-Stream-EFO-Consumer:*'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'
 

Health check

A generalized strategy for determining when to consider our data stream unavailable involves the use of Amazon CloudWatch metrics. We use the metrics coming off of our Lambda producer and consumer in order to assess the availability of our data stream. While producing to a data stream, an error might appear as one of the following responses back from the data stream: PutRecord or PutRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503 error. When consuming from a data stream, an error might appear as one of the following responses back from the data stream: SubscribeToShard.Success or GetRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503.

We can calculate our effective error rate based on PutRecord.Success and GetRecord.Success. An average error rate of 1% or higher over a time window of 5 minutes, for example, could indicate that there is an issue with the data stream, and we may want to fail over. In our CloudFormation template, this error rate threshold as well as time window are configurable, but by default we check for an error rate of 1% in the last 5 minutes to trigger a failover of our clients.

Client failover

When a data stream is deemed to be unreachable, we must now take action to keep our system available and reachable for clients on both ends of the interaction. This means for producers following the cold standby high availability architecture, we change the destination stream where the producer was writing. If high availability and failover of data producers and consumers isn’t a requirement of a given use case, a different architecture would be a better fit.

Prior to failover, the producer may have been delivering data to a stream in Region A, but we now automatically update the destination to be the stream in Region B. For different clients, the methodology of updating the producer will be different, but for ours, we store the active destination for producers in the Lambda environment variables from AWS CloudFormation and update our Lambda functions dynamically on health check failure scenarios.

For our use case, we use the maximum consumer lag time (iteratorAge) plus some buffer to influence the starting position of the failover consumer. This allows us to ensure that the consumer in the secondary Region doesn’t skip records that haven’t been processed in the originating Region, but some data overlap may occur. Note that some duplicates in the downstream system may be introduced, and having an idempotent sink or some method of handling duplicates must be implemented in order to avoid duplicate-related isssues.

In the case where data is successfully written to a data stream but is unable to be consumed from the stream, the data will not be replicated and therefore be unavailable in the second Region. The data will be durably stored in the primary data stream until it comes back online and can be read from. Note that if the stream is unavailable for a longer period of time than your total data retention period on the data stream, this data will be lost. Data retention for Kinesis Data Streams can be retrospectively increased up to 1 year.

For consumers in a cold standby architecture, upon failure detection, the consumer will be disabled or shut down, and the same consumer instance will be spun up in the secondary Region to consume from the secondary data stream. On the consumer side, we assume that the consumer application is stateless in our provided solution. If your application requires state, you can migrate or preload the application state via Amazon Simple Storage Service (Amazon S3) or a database. For a stateless application, the most important aspect of failover is the starting position.

In the following timeline, we can see that at some point, the stream in Region A was deemed unreachable.

The consumer application in Region A was reading data at time t10, and when it fails over to the secondary Region (B), it reads starting at t5 (5 minutes before the current iteratorAgeMilliseconds). This ensures that data isn’t skipped by the consumer application. Keep in mind that there may be some overlap in records in the downstream destinations.

In the provided cold standby AWS CloudFormation example, we can manually trigger a failover with the AWS Command Line Interface (AWS CLI). In the following code, we manually fail over to us-east-2:

aws lambda invoke --function-name KDS-HA-FailoverLambda --cli-binary-format raw-in-base64-out --payload '{}' response.json --region us-west-2

After a few minutes, you can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that the last_updated_region attribute shows us-east-2 as its value because it’s failed over to the us-east-2 Region.

Failback

After an outage or service unavailability is deemed to be resolved, the next logical step is to reorient your clients back to their original operating Regions. Although it may be tempting to automate this procedure, a manual failback approach during off-business hours when minimal production disruption will take place makes more sense.

In the following images, we can visualize the timeline with which consumer applications are failed back to the original Region.

The producer switches back to the original Region, and we wait for the consumer in Region B to reach 0 lag. At this point, the consumer application in Region B is disabled, and replication to Region B is resumed. We have now returned to our normal state of processing messages as shown in the replication section of this post.

In our AWS CloudFormation setup, we perform a failback with the following steps:

  1. Re-enable the event source mapping and start consuming messages from the primary Region at the latest position:
aws lambda create-event-source-mapping --function-name KDS-HA-ConsumerLambda --batch-size 100 --event-source-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --starting-position LATEST --region us-west-2
  1. Switch the producer back to the primary Region:
aws lambda update-function-configuration --function-name KDS-HA-ProducerLambda --environment "Variables={INPUT_STREAM=KDS-HA-Stream,PRODUCING_TO_REGION=us-west-2}" --region us-west-2
  1. In the failover Region (us-east-2), wait for your data stream’s GetRecords max iterator age (in milliseconds) CloudWatch metric to report 0 as a value. We’re waiting for the consumer Lambda function to catch up with all produced messages.
  2. Stop consuming messages from the failover Region.
  3. Run the following AWS CLI command and grab the UUID from the response, which we use to delete the existing event source mapping. Make sure you’re picking event source mapping for the Lambda function KDS-HA-ConsumerLambda.
aws lambda list-event-source-mappings --region us-east-2
aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-east-2
  1. Restart the replication agent in the primary Region.
  2. Run following AWS CLI command, and capture ConsumerARN from the response:
aws kinesis list-stream-consumers --stream-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --region us-west-2
aws lambda create-event-source-mapping --function-name KDS-HA-ReplicationAgentLambda --batch-size 100 --event-source-arn {{ConsumerARN}} --starting-position LATEST --region us-west-2

When this is complete, you can observe the same data stream metrics—the number of records in and out per second, consumer lag metrics, and number of errors as described in the health check section of this post—to ensure that each of the components has resumed processing data in the original Region. We can also take note of the data landing in DynamoDB, which displays which Region data is being updated from in order to determine the success of our failback procedure.

We recommend for any streaming workload that can’t withstand extended data loss or downtime to implement some form of cross-Region high availability in the unlikely event of service unavailability. These recommendations can help you determine which pattern is right for your use case.

Clean up

To avoid incurring future charges, complete the following steps:

  1. Delete the CloudFormation stack from primary Region us-west-2.
  2. Delete the CloudFormation stack from failover Region us-east-2.
  3. List all event source mappings in primary Region us-west-2 using the aws lambda list-event-source-mappings --region us-west-2 command and note the UUIDs of the event source mappings tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions.
  4. Delete event source mappings in primary Region us-west-2 tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions using the aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-west-2 command and UUIDs noted in the previous step.

Conclusion

Building highly available Kinesis data streams across multiple Regions is multi-faceted, and all aspects of your RPO, RTO, and operational costs need to be carefully considered. The code and architecture discussed in this post is one of many different architectural patterns you can choose for your workloads, so make sure to choose the appropriate architecture based on the criteria for your specific requirements.

To learn more about Kinesis Data Streams, we have a getting started guide as well as a workshop to walk through all the integrations with Kinesis Data Streams. You can also contact your AWS Solutions Architects, who can be of assistance alongside your high availability journey.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 7 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day, and process complex machine learning algorithms in real time. At AWS, he is a Senior Streaming Specialist Solutions Architect supporting both Amazon MSK and Amazon Kinesis.

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices, and proactively helps keep customer’s AWS environments operationally healthy.

Run a popular benchmark on Amazon Redshift Serverless easily with AWS Data Exchange

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/run-a-popular-benchmark-on-amazon-redshift-serverless-easily-with-aws-data-exchange/

Amazon Redshift is a fast, easy, secure, and economical cloud data warehousing service designed for analytics. AWS announced Amazon Redshift Serverless general availability in July 2022, providing an easier experience to operate Amazon Redshift. Amazon Redshift Serverless makes it simple to run and scale analytics without having to manage your data warehouse infrastructure. Amazon Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use.

Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access external data in open file formats like CSV and Parquet stored in Amazon S3. For more information on RPU pricing, refer to Amazon Redshift pricing.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. With AWS Data Exchange for Amazon Redshift, customers can start querying, evaluating, analyzing, and joining third-party data with their own first-party data without requiring any extracting, transforming, and loading (ETL). Data providers can list and offer products containing Amazon Redshift datasets in the AWS Data Exchange catalog, granting subscribers direct, read-only access to the data stored in Amazon Redshift. This feature empowers customers to quickly query, analyze, and build applications with these third-party datasets.

TPC-DS is a commonly used benchmark for measuring the query performance of data warehouse solutions such as Amazon Redshift. The benchmark is useful in proving the query capabilities of executing simple to complex queries in a timely manner. It is also used to measure the performance of different database configurations, different concurrent workloads, and also against other database products.

This blog post walks you through the steps you’ll need to set up Amazon Redshift Serverless and run the SQL queries derived from the TPC-DS benchmark against data from the AWS Data Exchange.

Solution overview

We will get started by creating an Amazon Redshift Serverless workgroup and namespace. A namespace is a collection of database objects and users while a workgroup is a collection of compute resources. To simplify executing the benchmark queries, a Linux EC2 instance will also be deployed.

Next, a GitHub repo containing the TPC-DS derived queries will be used. The TPC-DS benchmark is frequently used for evaluating the performance and functionality of cloud data warehouses. The TPC-DS benchmark includes additional steps and requirements to be considered official, but for this blog post, we are focused on only executing the SQL SELECT queries from this benchmark.

The last component of the solution is data. The TPC-DS benchmark includes binaries for generating data, but this is time-consuming to run. We have avoided this problem by generating the data, and we have made this available freely in the AWS Data Exchange.

Automated setup: CloudFormation

BDB-2063-launch-cloudformation-stack

Click the Launch Stack link above to launch the CloudFormation stack, which will automate the deployment of resources needed for the demo. The template deploys the following resources in your default VPC:

  • Amazon Compute Cloud (Amazon EC2) instance with the latest version of Amazon Linux
  • Amazon Redshift Serverless workgroup and namespace
  • IAM role with redshift-serverless:GetWorkgroup action granted; this is attached to the EC2 instance so that a command line interface (CLI) command can run to complete the instance configuration
  • Security group with inbound port 22 (ssh) and connectivity between the EC2 instance and the Amazon Redshift Serverless workgroup
  • The GitHub repo is downloaded in the EC2 instance

Template parameters

  • Stack: CloudFormation term used to define all of the resources created by the template.
  • KeyName: This is the name of an existing key pair. If you don’t have one already, create a key pair that is used to connect by SSH to the EC2 instance. More information on key pairs.
  • SSHLocation: The CIDR mask for incoming connections to the EC2 instance. The default is 0.0.0.0/0, which means any IP address is allowed to connect by SSH to the EC2 instance, provided the client has the key pair private key. The best practice for security is to limit this to a smaller range of IP addresses. For example, you can use sites like www.whatismyip.com to get your IP address.
  • RedshiftCapacity: This is the number of RPUs for the Amazon Redshift Serverless workgroup. The default is 128 and is recommended for analyzing the larger TPC-DS datasets. You can update the RPU capacity after deployment if you like or redeploy it with a different capacity value.

Manual setup

If you choose not to use the CloudFormation template, deploy the EC2 instance and Amazon Redshift Serverless with the following instructions. The following steps are only needed if you are manually provisioning the resources rather than using the provided CloudFormation template.

Amazon Redshift setup

Here are the high-level steps to create an Amazon Redshift Serverless workgroup. You can get more detailed information from this News Blog post.

To create your workgroup, complete the following steps:

  1. On the Amazon Redshift console, navigate to the Amazon Redshift Serverless dashboard.
  2. Choose Create workgroup.
  3. For Workgroup name, enter a name.
  4. Choose Next.
  5. For Namespace, enter a unique name.
  6. Choose Next.
  7. Choose Create.

These steps create an Amazon Redshift Serverless workgroup with 128 RPUs. This is the default; you can easily adjust this up or down based on your workload and budget constraints.

Linux EC2 instance setup

  • Deploy a virtual machine in the same AWS Region as your Amazon Redshift database using the Amazon Linux 2 AMI.
  • The Amazon Linux 2 AMI (64-bit x86) with the t2.micro instance type is an inexpensive and tested configuration.
  • Add the security group configured for your Amazon Redshift database to your EC2 instance.
  • Install psql with sudo yum install postgresql.x86_64 -y
  • Download this GitHub repo.
    git clone --depth 1 https://github.com/aws-samples/redshift-benchmarks /home/ec2-user/redshift-benchmarks

  • Set the following environment variables for Amazon Redshift:
    • PGHOST: This is the endpoint for the Amazon Redshift database.
    • PGPORT: This is the port the database listens on. The Amazon Redshift default is 5439.
    • PGUSER: This is your Amazon Redshift database user.
    • PGDATABASE: This is the database name where your external schemas are created. This is NOT the database created for the data share. We suggest using the default “dev” database.

Example:

export PGUSER="awsuser" 
export PGHOST="default.01.us-east-1.redshift-serverless.amazonaws.com" 
export PGPORT="5439" 
export PGDATABASE="dev"

Configure the .pgpass file to store your database credentials. The format for the .pgpass file is: hostname:port:database:user:password

Example:

default.01.us-east-1.redshift-serverless.amazonaws.com:5439:*:user1:user1P@ss

AWS Data Exchange setup

AWS Data Exchange provides third-party data in multiple data formats, including Amazon Redshift. You can subscribe to catalog listings in multiple storage locations like Amazon S3 and Amazon Redshift data shares. We encourage you to explore the AWS Data Exchange catalog on your own because there are many datasets available that can be used to enhance your data in Amazon Redshift.

First, subscribe to the AWS Marketplace listing for TPC-DS Benchmark Data. Select the Continue to subscribe button from the AWS Data Exchange catalog listing. After you review the offer and Terms and Conditions of the data product, choose Subscribe. Note that you will need the appropriate IAM permissions to subscribe to AWS Data Exchange on Marketplace. More information can be found at AWS managed policies for AWS Data Exchange.

TPC-DS uses 24 tables in a dimensional model that simulates a decision support system. It has store, catalog, and web sales as well as store, catalog, and web returns fact tables. It also has the dimension tables to support these fact tables.

TPC-DS includes a utility to generate data for the benchmark at a given scale factor. The smallest scale factor is 1 GB (uncompressed). Most benchmark tests for cloud warehouses are run with 3–10 TB of data because the dataset is large enough to stress the system but also small enough to complete the entire test in a reasonable amount of time.

There are six database schemas provided in the TPC-DS Benchmark Data subscription with 1; 10; 100; 1,000; 3,000; and 10,000 scale factors. The scale factor refers to the uncompressed data size measured in GB. Each schema refers to a dataset with the corresponding scale factor.

Scale factor (GB) ADX schema Amazon Redshift Serverless external schema
1 tpcds1 ext_tpcds1
10 tpcds10 ext_tpcds10
100 tpcds100 ext_tpcds100
1,000 tpcds1000 ext_tpcds1000
3,000 tpcds3000 ext_tpcds3000
10,000 tpcds10000 ext_tpcds10000

The following steps will create external schemas in your Amazon Redshift Serverless database that maps to schemas found in the AWS Data Exchange.

  • Log in to the EC2 instance and create a database connection.
    psql

  • Run the following query:
    select share_name, producer_account, producer_namespace from svv_datashares;

  • Use the output of this query to run the next command:
    create database tpcds_db from datashare <share_name> of account '<producer_account>' namespace '<producer_namespace>';

  • Last, you create the external schemas in Amazon Redshift:
    create external schema ext_tpcds1 from redshift database tpcds_db schema tpcds1;
    create external schema ext_tpcds10 from redshift database tpcds_db schema tpcds10;
    create external schema ext_tpcds100 from redshift database tpcds_db schema tpcds100;
    create external schema ext_tpcds1000 from redshift database tpcds_db schema tpcds1000;
    create external schema ext_tpcds3000 from redshift database tpcds_db schema tpcds3000;
    create external schema ext_tpcds10000 from redshift database tpcds_db schema tpcds10000;

  • You can now exit psql with this command:
    \q

TPC-DS derived benchmark

The TPC-DS derived benchmark consists of 99 queries in four broad categories:

  • Reporting queries
  • Ad hoc queries
  • Iterative OLAP queries
  • Data mining queries

In addition to running the 99 queries, the benchmark tests concurrency. During the concurrency portion of the test, there are n sessions (default of 5) that run the queries. Each session runs the 99 queries with different parameters and in slightly different order. This concurrency test stresses the resources of the database and generally takes longer to complete than just a single session running the 99 queries.

Some data warehouse products are configured to optimize single-user performance, whereas others may not have the ability to manage the workload effectively. This is a great way to demonstrate the workload management and stability of Amazon Redshift.

Since the data for each scale factor is located in a different schema, running the benchmark against each scale factor requires changing the schema you are referencing. The search_path defines which schemas to search for tables when a query contains objects without a schema included. For example:

ALTER USER <username> SET search_path=ext_tpcds3000,public;

The benchmark scripts set the search_path automatically.

Note: The scripts create a schema called tpcds_reports which will store the detailed output of each step of the benchmark. Each time the scripts are run, this schema will be recreated, and the latest results will be stored. If you happen to already have a schema named tpcds_reports, these scripts will drop the schema.

Running the TPC-DS derived queries

  • Connect by SSH to the EC2 instance with your key pair.
  • Change directory:
    cd ~/redshift-benchmarks/adx-tpc-ds/

  • Optionally configure the variables for the scripts in tpcds_variables.sh

Here are the default values for the variables you can set:

  • EXT_SCHEMA="ext_tpcds3000": This is the name of the external schema created that has the TPC-DS dataset. The “3000” value means the scale factor is 3000 or 3 TB (uncompressed).
  • EXPLAIN="false": If set to false, queries will run. If set to true, queries will generate explain plans rather than actually running. Each query will be logged in the log directory. Default is false.
  • MULTI_USER_COUNT="5": 0 to 20 concurrent users will run the queries. The order of the queries was set with dsqgen. Setting to 0 will skip the multi-user test. Default is 5.
  • Run the benchmark:
    ./rollout.sh > rollout.log 2>&1 &

TPC-DS derived benchmark results

We performed a test with the 3 TB ADX TPC-DS dataset on an Amazon Redshift Serverless workgroup with 128 RPUs. Additionally, we disabled query caching so that query results aren’t cached. This allows us to measure the performance of the database as opposed to its ability to serve results from cache.

The test comprises two sections. The first will run the 99 queries serially using one user while the second section will start multiple sessions based on the configuration file you set earlier. Each session will run concurrently, and each will run all 99 queries but in a different order.

The total runtime for the single-user queries was 15 minutes and 11 seconds. As shown in the following graph, the longest-running query was query 67, with an elapsed time of only 101 seconds. The average runtime was only 9.2 seconds.

1 Session TPC-DS 3TB 128 RPUs

With five concurrent users, the runtime was 28 minutes and 35 seconds, which demonstrates how Amazon Redshift Serverless performs well for single-user and concurrent-user workloads.

5 Concurrent Sessions TPC-DS 3TB 128 RPUs

As you can see, it was pretty easy to deploy Amazon Redshift Serverless, subscribe to an AWS Data Exchange product listing, and run a fairly complex benchmark in a short amount of time.

Next steps

You can run the benchmark scripts again but with different dataset sizes or a different number of concurrent users by editing the tpcds_variables.sh file. You can also try resizing your Amazon Redshift Serverless workgroup to see the performance difference with more or fewer RPUs. You can also run individual queries to see the results firsthand.

Another thing to try is to subscribe to other AWS Data Exchange products and query this data from Amazon Redshift Serverless. Be curious and explore using Amazon Redshift Serverless and the AWS Data Exchange!

Clean up

If you deployed the resources with the automated solution, you just need to delete the stack created in CloudFormation. All resources created by the stack will be deleted automatically.

If you deployed the resources manually, you need to delete the following:

  • The Amazon Redshift database created earlier.
    • If you deployed Amazon Redshift Serverless, you will need to delete both the workgroup and the namespace.
  • The Amazon EC2 instance.

Optionally, you can unsubscribe from the TPC-DS data by going to your AWS Data Exchange Subscriptions and then turning Renewal to Off.

Conclusion

This blog post covered deploying Amazon Redshift Serverless, subscribing to an AWS Data Exchange product, and running a complex benchmark in a short amount of time. Amazon Redshift Serverless can handle high levels of concurrency with very little effort and excels in price-performance.

If you have any questions or feedback, please leave them in the comments section.


About the author

Jon RobertsJon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Unlock the power of EC2 Graviton with GitLab CI/CD and EKS Runners

Post Syndicated from Michael Fischer original https://aws.amazon.com/blogs/devops/unlock-the-power-of-ec2-graviton-with-gitlab-ci-cd-and-eks-runners/

Many AWS customers are using GitLab for their DevOps needs, including source control, and continuous integration and continuous delivery (CI/CD). Many of our customers are using GitLab SaaS (the hosted edition), while others are using GitLab Self-managed to meet their security and compliance requirements.

Customers can easily add runners to their GitLab instance to perform various CI/CD jobs. These jobs include compiling source code, building software packages or container images, performing unit and integration testing, etc.—even all the way to production deployment. For the SaaS edition, GitLab offers hosted runners, and customers can provide their own runners as well. Customers who run GitLab Self-managed must provide their own runners.

In this post, we’ll discuss how customers can maximize their CI/CD capabilities by managing their GitLab runner and executor fleet with Amazon Elastic Kubernetes Service (Amazon EKS). We’ll leverage both x86 and Graviton runners, allowing customers for the first time to build and test their applications both on x86 and on AWS Graviton, our most powerful, cost-effective, and sustainable instance family. In keeping with AWS’s philosophy of “pay only for what you use,” we’ll keep our Amazon Elastic Compute Cloud (Amazon EC2) instances as small as possible, and launch ephemeral runners on Spot instances. We’ll demonstrate building and testing a simple demo application on both architectures. Finally, we’ll build and deliver a multi-architecture container image that can run on Amazon EC2 instances or AWS Fargate, both on x86 and Graviton.

Figure 1. Managed GitLab runner architecture overview

Figure 1.  Managed GitLab runner architecture overview.

Let’s go through the components:

Runners

A runner is an application to which GitLab sends jobs that are defined in a CI/CD pipeline. The runner receives jobs from GitLab and executes them—either by itself, or by passing it to an executor (we’ll visit the executor in the next section).

In our design, we’ll be using a pair of self-hosted runners. One runner will accept jobs for the x86 CPU architecture, and the other will accept jobs for the arm64 (Graviton) CPU architecture. To help us route our jobs to the proper runner, we’ll apply some tags to each runner indicating the architecture for which it will be responsible. We’ll tag the x86 runner with x86, x86-64, and amd64, thereby reflecting the most common nicknames for the architecture, and we’ll tag the arm64 runner with arm64.

Currently, these runners must always be running so that they can receive jobs as they are created. Our runners only require a small amount of memory and CPU, so that we can run them on small EC2 instances to minimize cost. These include t4g.micro for Graviton builds, or t3.micro or t3a.micro for x86 builds.

To save money on these runners, consider purchasing a Savings Plan or Reserved Instances for them. Savings Plans and Reserved Instances can save you up to 72% over on-demand pricing, and there’s no minimum spend required to use them.

Kubernetes executors

In GitLab CI/CD, the executor’s job is to perform the actual build. The runner can create hundreds or thousands of executors as needed to meet current demand, subject to the concurrency limits that you specify. Executors are created only when needed, and they are ephemeral: once a job has finished running on an executor, the runner will terminate it.

In our design, we’ll use the Kubernetes executor that’s built into the GitLab runner. The Kubernetes executor simply schedules a new pod to run each job. Once the job completes, the pod terminates, thereby freeing the node to run other jobs.

The Kubernetes executor is highly customizable. We’ll configure each runner with a nodeSelector that makes sure that the jobs are scheduled only onto nodes that are running the specified CPU architecture. Other possible customizations include CPU and memory reservations, node and pod tolerations, service accounts, volume mounts, and much more.

Scaling worker nodes

For most customers, CI/CD jobs aren’t likely to be running all of the time. To save cost, we only want to run worker nodes when there’s a job to run.

To make this happen, we’ll turn to Karpenter. Karpenter provisions EC2 instances as soon as needed to fit newly-scheduled pods. If a new executor pod is scheduled, and there isn’t a qualified instance with enough capacity remaining on it, then Karpenter will quickly and automatically launch a new instance to fit the pod. Karpenter will also periodically scan the cluster and terminate idle nodes, thereby saving on costs. Karpenter can terminate a vacant node in as little as 30 seconds.

Karpenter can launch either Amazon EC2 on-demand or Spot instances depending on your needs. With Spot instances, you can save up to 90% over on-demand instance prices. Since CI/CD jobs often aren’t time-sensitive, Spot instances can be an excellent choice for GitLab execution pods. Karpenter will even automatically find the best Spot instance type to speed up the time it takes to launch an instance and minimize the likelihood of job interruption.

Deploying our solution

To deploy our solution, we’ll write a small application using the AWS Cloud Development Kit (AWS CDK) and the EKS Blueprints library. AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. EKS Blueprints is a library designed to make it simple to deploy complex Kubernetes resources to an Amazon EKS cluster with minimum coding.

The high-level infrastructure code – which can be found in our GitLab repo – is very simple. I’ve included comments to explain how it works.

// All CDK applications start with a new cdk.App object.
const app = new cdk.App();

// Create a new EKS cluster at v1.23. Run all non-DaemonSet pods in the 
// `kube-system` (coredns, etc.) and `karpenter` namespaces in Fargate
// so that we don't have to maintain EC2 instances for them.
const clusterProvider = new blueprints.GenericClusterProvider({
  version: KubernetesVersion.V1_23,
  fargateProfiles: {
    main: {
      selectors: [
        { namespace: 'kube-system' },
        { namespace: 'karpenter' },
      ]
    }
  },
  clusterLogging: [
    ClusterLoggingTypes.API,
    ClusterLoggingTypes.AUDIT,
    ClusterLoggingTypes.AUTHENTICATOR,
    ClusterLoggingTypes.CONTROLLER_MANAGER,
    ClusterLoggingTypes.SCHEDULER
  ]
});

// EKS Blueprints uses a Builder pattern.
blueprints.EksBlueprint.builder()
  .clusterProvider(clusterProvider) // start with the Cluster Provider
  .addOns(
    // Use the EKS add-ons that manage coredns and the VPC CNI plugin
    new blueprints.addons.CoreDnsAddOn('v1.8.7-eksbuild.3'),
    new blueprints.addons.VpcCniAddOn('v1.12.0-eksbuild.1'),
    // Install Karpenter
    new blueprints.addons.KarpenterAddOn({
      provisionerSpecs: {
        // Karpenter examines scheduled pods for the following labels
        // in their `nodeSelector` or `nodeAffinity` rules and routes
        // the pods to the node with the best fit, provisioning a new
        // node if necessary to meet the requirements.
        //
        // Allow either amd64 or arm64 nodes to be provisioned 
        'kubernetes.io/arch': ['amd64', 'arm64'],
        // Allow either Spot or On-Demand nodes to be provisioned
        'karpenter.sh/capacity-type': ['spot', 'on-demand']
      },
      // Launch instances in the VPC private subnets
      subnetTags: {
        Name: 'gitlab-runner-eks-demo/gitlab-runner-eks-demo-vpc/PrivateSubnet*'
      },
      // Apply security groups that match the following tags to the launched instances
      securityGroupTags: {
        'kubernetes.io/cluster/gitlab-runner-eks-demo': 'owned'      
      }
    }),
    // Create a pair of a new GitLab runner deployments, one running on
    // arm64 (Graviton) instance, the other on an x86_64 instance.
    // We'll show the definition of the GitLabRunner class below.
    new GitLabRunner({
      arch: CpuArch.ARM_64,
      // If you're using an on-premise GitLab installation, you'll want
      // to change the URL below.
      gitlabUrl: 'https://gitlab.com',
      // Kubernetes Secret containing the runner registration token
      // (discussed later)
      secretName: 'gitlab-runner-secret'
    }),
    new GitLabRunner({
      arch: CpuArch.X86_64,
      gitlabUrl: 'https://gitlab.com',
      secretName: 'gitlab-runner-secret'
    }),
  )
  .build(app, 
         // Stack name
         'gitlab-runner-eks-demo');
The GitLabRunner class is a HelmAddOn subclass that takes a few parameters from the top-level application:
// The location and name of the GitLab Runner Helm chart
const CHART_REPO = 'https://charts.gitlab.io';
const HELM_CHART = 'gitlab-runner';

// The default namespace for the runner
const DEFAULT_NAMESPACE = 'gitlab';

// The default Helm chart version
const DEFAULT_VERSION = '0.40.1';

export enum CpuArch {
    ARM_64 = 'arm64',
    X86_64 = 'amd64'
}

// Configuration parameters
interface GitLabRunnerProps {
    // The CPU architecture of the node on which the runner pod will reside
    arch: CpuArch
    // The GitLab API URL 
    gitlabUrl: string
    // Kubernetes Secret containing the runner registration token (discussed later)
    secretName: string
    // Optional tags for the runner. These will be added to the default list 
    // corresponding to the runner's CPU architecture.
    tags?: string[]
    // Optional Kubernetes namespace in which the runner will be installed
    namespace?: string
    // Optional Helm chart version
    chartVersion?: string
}

export class GitLabRunner extends HelmAddOn {
    private arch: CpuArch;
    private gitlabUrl: string;
    private secretName: string;
    private tags: string[] = [];

    constructor(props: GitLabRunnerProps) {
        // Invoke the superclass (HelmAddOn) constructor
        super({
            name: `gitlab-runner-${props.arch}`,
            chart: HELM_CHART,
            repository: CHART_REPO,
            namespace: props.namespace || DEFAULT_NAMESPACE,
            version: props.chartVersion || DEFAULT_VERSION,
            release: `gitlab-runner-${props.arch}`,
        });

        this.arch = props.arch;
        this.gitlabUrl = props.gitlabUrl;
        this.secretName = props.secretName;

        // Set default runner tags
        switch (this.arch) {
            case CpuArch.X86_64:
                this.tags.push('amd64', 'x86', 'x86-64', 'x86_64');
                break;
            case CpuArch.ARM_64:
                this.tags.push('arm64');
                break;
        }
        this.tags.push(...props.tags || []); // Add any custom tags
    };

    // `deploy` method required by the abstract class definition. Our implementation
    // simply installs a Helm chart to the cluster with the proper values.
    deploy(clusterInfo: ClusterInfo): void | Promise<Construct> {
        const chart = this.addHelmChart(clusterInfo, this.getValues(), true);
        return Promise.resolve(chart);
    }

    // Returns the values for the GitLab Runner Helm chart
    private getValues(): Values {
        return {
            gitlabUrl: this.gitlabUrl,
            runners: {
                config: this.runnerConfig(), // runner config.toml file, from below
                name: `demo-runner-${this.arch}`, // name as seen in GitLab UI
                tags: uniq(this.tags).join(','),
                secret: this.secretName, // see below
            },
            // Labels to constrain the nodes where this runner can be placed
            nodeSelector: {
                'kubernetes.io/arch': this.arch,
                'karpenter.sh/capacity-type': 'on-demand'
            },
            // Default pod label
            podLabels: {
                'gitlab-role': 'manager'
            },
            // Create all the necessary RBAC resources including the ServiceAccount
            rbac: {
                create: true
            },
            // Required resources (memory/CPU) for the runner pod. The runner
            // is fairly lightweight as it's a self-contained Golang app.
            resources: {
                requests: {
                    memory: '128Mi',
                    cpu: '256m'
                }
            }
        };
    }

    // This string contains the runner's `config.toml` file including the
    // Kubernetes executor's configuration. Note the nodeSelector constraints 
    // (including the use of Spot capacity and the CPU architecture).
    private runnerConfig(): string {
        return `
  [[runners]]
    [runners.kubernetes]
      namespace = "{{.Release.Namespace}}"
      image = "ubuntu:16.04"
    [runners.kubernetes.node_selector]
      "kubernetes.io/arch" = "${this.arch}"
      "kubernetes.io/os" = "linux"
      "karpenter.sh/capacity-type" = "spot"
    [runners.kubernetes.pod_labels]
      gitlab-role = "runner"
      `.trim();
    }
}

For security reasons, we store the GitLab registration token in a Kubernetes Secret – never in our source code. For additional security, we recommend encrypting Secrets using an AWS Key Management Service (AWS KMS) key that you supply by specifying the encryption configuration when you create your Amazon EKS cluster. It’s a good practice to restrict access to this Secret via Kubernetes RBAC rules.

To create the Secret, run the following command:

# These two values must match the parameters supplied to the GitLabRunner constructor
NAMESPACE=gitlab
SECRET_NAME=gitlab-runner-secret
# The value of the registration token.
TOKEN=GRxxxxxxxxxxxxxxxxxxxxxx

kubectl -n $NAMESPACE create secret generic $SECRET_NAME \
        --from-literal="runner-registration-token=$TOKEN" \
        --from-literal="runner-token="

Building a multi-architecture container image

Now that we’ve launched our GitLab runners and configured the executors, we can build and test a simple multi-architecture container image. If the tests pass, we can then upload it to our project’s GitLab container registry. Our application will be pretty simple: we’ll create a web server in Go that simply prints out “Hello World” and prints out the current architecture.

Find the source code of our sample app in our GitLab repo.

In GitLab, the CI/CD configuration lives in the .gitlab-ci.yml file at the root of the source repository. In this file, we declare a list of ordered build stages, and then we declare the specific jobs associated with each stage.

Our stages are:

  1. The build stage, in which we compile our code, produce our architecture-specific images, and upload these images to the GitLab container registry. These uploaded images are tagged with a suffix indicating the architecture on which they were built. This job uses a matrix variable to run it in parallel against two different runners – one for each supported architecture. Furthermore, rather than using docker build to produce our images, we use Kaniko to build them. This lets us build our images in an unprivileged container environment and improve the security posture considerably.
  2. The test stage, in which we test the code. As with the build stage, we use a matrix variable to run the tests in parallel in separate pods on each supported architecture.

The assembly stage, in which we create a multi-architecture image manifest from the two architecture-specific images. Then, we push the manifest into the image registry so that we can refer to it in future deployments.

Figure 2. Example CI/CD pipeline for multi-architecture images

Figure 2. Example CI/CD pipeline for multi-architecture images.

Here’s what our top-level configuration looks like:

variables:
  # These are used by the runner to configure the Kubernetes executor, and define
  # the values of spec.containers[].resources.limits.{memory,cpu} for the Pod(s).
  KUBERNETES_MEMORY_REQUEST: 1Gi
  KUBERNETES_CPU_REQUEST: 1

# List of stages for jobs, and their order of execution  
stages:    
  - build
  - test
  - create-multiarch-manifest
Here’s what our build stage job looks like. Note the matrix of variables which are set in BUILD_ARCH as the two jobs are run in parallel:
build-job:
  stage: build
  parallel:
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
  image:
    name: gcr.io/kaniko-project/executor:debug
    entrypoint: [""]
  script:
    - mkdir -p /kaniko/.docker
    # Configure authentication data for Kaniko so it can push to the
    # GitLab container registry
    - echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
    # Build the image and push to the registry. In this stage, we append the build
    # architecture as a tag suffix.
    - >-
      /kaniko/executor
      --context "${CI_PROJECT_DIR}"
      --dockerfile "${CI_PROJECT_DIR}/Dockerfile"
      --destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}-${BUILD_ARCH}"

Here’s what our test stage job looks like. This time we use the image that we just produced. Our source code is copied into the application container. Then, we can run make test-api to execute the server test suite.

build-job:
  stage: build
  parallel:
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
  image:
    # Use the image we just built
    name: "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}-${BUILD_ARCH}"
  script:
    - make test-container

Finally, here’s what our assembly stage looks like. We use Podman to build the multi-architecture manifest and push it into the image registry. Traditionally we might have used docker buildx to do this, but using Podman lets us do this work in an unprivileged container for additional security.

create-manifest-job:
  stage: create-multiarch-manifest
  tags: [arm64] 
  image: public.ecr.aws/docker/library/fedora:36
  script:
    - yum -y install podman
    - echo "${CI_REGISTRY_PASSWORD}" | podman login -u "${CI_REGISTRY_USER}" --password-stdin "${CI_REGISTRY}"
    - COMPOSITE_IMAGE=${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}
    - podman manifest create ${COMPOSITE_IMAGE}
    - >-
      for arch in arm64 amd64; do
        podman manifest add ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}-${arch};
      done
    - podman manifest inspect ${COMPOSITE_IMAGE}
    # The composite image manifest omits the architecture from the tag suffix.
    - podman manifest push ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}

Trying it out

I’ve created a public test GitLab project containing the sample source code, and attached the runners to the project. We can see them at Settings > CI/CD > Runners:

Figure 3. GitLab runner configurations

Figure 3. GitLab runner configurations.

Here we can also see some pipeline executions, where some have succeeded, and others have failed.

Figure 4. GitLab sample pipeline executions

Figure 4. GitLab sample pipeline executions.

We can also see the specific jobs associated with a pipeline execution:

Figure 5. GitLab sample job executions

Figure 5. GitLab sample job executions.

Finally, here are our container images:

Figure 5. GitLab sample job executions

Figure 6. GitLab sample container registry.

Conclusion

In this post, we’ve illustrated how you can quickly and easily construct multi-architecture container images with GitLab, Amazon EKS, Karpenter, and Amazon EC2, using both x86 and Graviton instance families. We indexed on using as many managed services as possible, maximizing security, and minimizing complexity and TCO. We dove deep on multiple facets of the process, and discussed how to save up to 90% of the solution’s cost by using Spot instances for CI/CD executions.

Find the sample code, including everything shown here today, in our GitLab repository.

Building multi-architecture images will unlock the value and performance of running your applications on AWS Graviton and give you increased flexibility over compute choice. We encourage you to get started today.

About the author:

Michael Fischer

Michael Fischer is a Principal Specialist Solutions Architect at Amazon Web Services. He focuses on helping customers build more cost-effectively and sustainably with AWS Graviton. Michael has an extensive background in systems programming, monitoring, and observability. His hobbies include world travel, diving, and playing the drums.

Multi-branch pipeline management and infrastructure deployment using AWS CDK Pipelines

Post Syndicated from Iris Kraja original https://aws.amazon.com/blogs/devops/multi-branch-pipeline-management-and-infrastructure-deployment-using-aws-cdk-pipelines/

This post describes how to use the AWS CDK Pipelines module to follow a Gitflow development model using AWS Cloud Development Kit (AWS CDK). Software development teams often follow a strict branching strategy during a solutions development lifecycle. Newly-created branches commonly need their own isolated copy of infrastructure resources to develop new features.

CDK Pipelines is a construct library module for continuous delivery of AWS CDK applications. CDK Pipelines are self-updating: if you add application stages or stacks, then the pipeline automatically reconfigures itself to deploy those new stages and/or stacks.

The following solution creates a new AWS CDK Pipeline within a development account for every new branch created in the source repository (AWS CodeCommit). When a branch is deleted, the pipeline and all related resources are also destroyed from the account. This GitFlow model for infrastructure provisioning allows developers to work independently from each other, concurrently, even in the same stack of the application.

Solution overview

The following diagram provides an overview of the solution. There is one default pipeline responsible for deploying resources to the different application environments (e.g., Development, Pre-Prod, and Prod). The code is stored in CodeCommit. When new changes are pushed to the default CodeCommit repository branch, AWS CodePipeline runs the default pipeline. When the default pipeline is deployed, it creates two AWS Lambda functions.

These two Lambda functions are invoked by CodeCommit CloudWatch events when a new branch in the repository is created or deleted. The Create Lambda function uses the boto3 CodeBuild module to create an AWS CodeBuild project that builds the pipeline for the feature branch. This feature pipeline consists of a build stage and an optional update pipeline stage for itself. The Destroy Lambda function creates another CodeBuild project which cleans all of the feature branch’s resources and the feature pipeline.

Figure 1. Architecture diagram.

Figure 1. Architecture diagram.

Prerequisites

Before beginning this walkthrough, you should have the following prerequisites:

  • An AWS account
  • AWS CDK installed
  • Python3 installed
  • Jq (JSON processor) installed
  • Basic understanding of continuous integration/continuous development (CI/CD) Pipelines

Initial setup

Download the repository from GitHub:

# Command to clone the repository
git clone https://github.com/aws-samples/multi-branch-cdk-pipelines.git
cd multi-branch-cdk-pipelines

Create a new CodeCommit repository in the AWS Account and region where you want to deploy the pipeline and upload the source code from above to this repository. In the config.ini file, change the repository_name and region variables accordingly.

Make sure that you set up a fresh Python environment. Install the dependencies:

pip install -r requirements.txt

Run the initial-deploy.sh script to bootstrap the development and production environments and to deploy the default pipeline. You’ll be asked to provide the following parameters: (1) Development account ID, (2) Development account AWS profile name, (3) Production account ID, and (4) Production account AWS profile name.

sh ./initial-deploy.sh --dev_account_id <YOUR DEV ACCOUNT ID> --
dev_profile_name <YOUR DEV PROFILE NAME> --prod_account_id <YOUR PRODUCTION
ACCOUNT ID> --prod_profile_name <YOUR PRODUCTION PROFILE NAME>

Default pipeline

In the CI/CD pipeline, we set up an if condition to deploy the default branch resources only if the current branch is the default one. The default branch is retrieved programmatically from the CodeCommit repository. We deploy an Amazon Simple Storage Service (Amazon S3) Bucket and two Lambda functions. The bucket is responsible for storing the feature branches’ CodeBuild artifacts. The first Lambda function is triggered when a new branch is created in CodeCommit. The second one is triggered when a branch is deleted.

if branch == default_branch:
    
...

    # Artifact bucket for feature AWS CodeBuild projects
    artifact_bucket = Bucket(
        self,
        'BranchArtifacts',
        encryption=BucketEncryption.KMS_MANAGED,
        removal_policy=RemovalPolicy.DESTROY,
        auto_delete_objects=True
    )
...
    # AWS Lambda function triggered upon branch creation
    create_branch_func = aws_lambda.Function(
        self,
        'LambdaTriggerCreateBranch',
        runtime=aws_lambda.Runtime.PYTHON_3_8,
        function_name='LambdaTriggerCreateBranch',
        handler='create_branch.handler',
        code=aws_lambda.Code.from_asset(path.join(this_dir, 'code')),
        environment={
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix
        },
        role=iam_stack.create_branch_role)


    # AWS Lambda function triggered upon branch deletion
    destroy_branch_func = aws_lambda.Function(
        self,
        'LambdaTriggerDestroyBranch',
        runtime=aws_lambda.Runtime.PYTHON_3_8,
        function_name='LambdaTriggerDestroyBranch',
        handler='destroy_branch.handler',
        role=iam_stack.delete_branch_role,
        environment={
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix,
            "DEV_STAGE_NAME": f'{dev_stage_name}-{dev_stage.main_stack_name}'
        },
        code=aws_lambda.Code.from_asset(path.join(this_dir,
                                                  'code')))

Then, the CodeCommit repository is configured to trigger these Lambda functions based on two events:

(1) Reference created

# Configure AWS CodeCommit to trigger the Lambda function when a new branch is created
repo.on_reference_created(
    'BranchCreateTrigger',
    description="AWS CodeCommit reference created event.",
    target=aws_events_targets.LambdaFunction(create_branch_func))

(2) Reference deleted

# Configure AWS CodeCommit to trigger the Lambda function when a branch is deleted
repo.on_reference_deleted(
    'BranchDeleteTrigger',
    description="AWS CodeCommit reference deleted event.",
    target=aws_events_targets.LambdaFunction(destroy_branch_func))

Lambda functions

The two Lambda functions build and destroy application environments mapped to each feature branch. An Amazon CloudWatch event triggers the LambdaTriggerCreateBranch function whenever a new branch is created. The CodeBuild client from boto3 creates the build phase and deploys the feature pipeline.

Create function

The create function deploys a feature pipeline which consists of a build stage and an optional update pipeline stage for itself. The pipeline downloads the feature branch code from the CodeCommit repository, initiates the Build and Test action using CodeBuild, and securely saves the built artifact on the S3 bucket.

The Lambda function handler code is as follows:

def handler(event, context):
    """Lambda function handler"""
    logger.info(event)

    reference_type = event['detail']['referenceType']

    try:
        if reference_type == 'branch':
            branch = event['detail']['referenceName']
            repo_name = event['detail']['repositoryName']

            client.create_project(
                name=f'{codebuild_name_prefix}-{branch}-create',
                description="Build project to deploy branch pipeline",
                source={
                    'type': 'CODECOMMIT',
                    'location': f'https://git-codecommit.{region}.amazonaws.com/v1/repos/{repo_name}',
                    'buildspec': generate_build_spec(branch)
                },
                sourceVersion=f'refs/heads/{branch}',
                artifacts={
                    'type': 'S3',
                    'location': artifact_bucket_name,
                    'path': f'{branch}',
                    'packaging': 'NONE',
                    'artifactIdentifier': 'BranchBuildArtifact'
                },
                environment={
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'
                },
                serviceRole=role_arn
            )

            client.start_build(
                projectName=f'CodeBuild-{branch}-create'
            )
    except Exception as e:
        logger.error(e)

Create branch CodeBuild project’s buildspec.yaml content:

version: 0.2
env:
  variables:
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
phases:
  pre_build:
    commands:
      - npm install -g aws-cdk && pip install -r requirements.txt
  build:
    commands:
      - cdk synth
      - cdk deploy --require-approval=never
artifacts:
  files:
    - '**/*'

Destroy function

The second Lambda function is responsible for the destruction of a feature branch’s resources. Upon the deletion of a feature branch, an Amazon CloudWatch event triggers this Lambda function. The function creates a CodeBuild Project which destroys the feature pipeline and all of the associated resources created by that pipeline. The source property of the CodeBuild Project is the feature branch’s source code saved as an artifact in Amazon S3.

The Lambda function handler code is as follows:

def handler(event, context):
    logger.info(event)
    reference_type = event['detail']['referenceType']

    try:
        if reference_type == 'branch':
            branch = event['detail']['referenceName']
            client.create_project(
                name=f'{codebuild_name_prefix}-{branch}-destroy',
                description="Build project to destroy branch resources",
                source={
                    'type': 'S3',
                    'location': f'{artifact_bucket_name}/{branch}/CodeBuild-{branch}-create/',
                    'buildspec': generate_build_spec(branch)
                },
                artifacts={
                    'type': 'NO_ARTIFACTS'
                },
                environment={
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'
                },
                serviceRole=role_arn
            )

            client.start_build(
                projectName=f'CodeBuild-{branch}-destroy'
            )

            client.delete_project(
                name=f'CodeBuild-{branch}-destroy'
            )

            client.delete_project(
                name=f'CodeBuild-{branch}-create'
            )
    except Exception as e:
        logger.error(e)

Destroy the branch CodeBuild project’s buildspec.yaml content:

version: 0.2
env:
  variables:
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
phases:
  pre_build:
    commands:
      - npm install -g aws-cdk && pip install -r requirements.txt
  build:
    commands:
      - cdk destroy cdk-pipelines-multi-branch-{branch} --force
      - aws cloudformation delete-stack --stack-name {dev_stage_name}-{branch}
      - aws s3 rm s3://{artifact_bucket_name}/{branch} --recursive

Create a feature branch

On your machine’s local copy of the repository, create a new feature branch using the following git commands. Replace user-feature-123 with a unique name for your feature branch. Note that this feature branch name must comply with the CodePipeline naming restrictions, as it will be used to name a unique pipeline later in this walkthrough.

# Create the feature branch
git checkout -b user-feature-123
git push origin user-feature-123

The first Lambda function will deploy the CodeBuild project, which then deploys the feature pipeline. This can take a few minutes. You can log in to the AWS Console and see the CodeBuild project running under CodeBuild.

Figure 2. AWS Console - CodeBuild projects.

Figure 2. AWS Console – CodeBuild projects.

After the build is successfully finished, you can see the deployed feature pipeline under CodePipelines.

Figure 3. AWS Console - CodePipeline pipelines.

Figure 3. AWS Console – CodePipeline pipelines.

The Lambda S3 trigger project from AWS CDK Samples is used as the infrastructure resources to demonstrate this solution. The content is placed inside the src directory and is deployed by the pipeline. When visiting the Lambda console page, you can see two functions: one by the default pipeline and one by our feature pipeline.

Figure 4. AWS Console - Lambda functions.

Figure 4. AWS Console – Lambda functions.

Destroy a feature branch

There are two common ways for removing feature branches. The first one is related to a pull request, also known as a “PR”. This occurs when merging a feature branch back into the default branch. Once it’s merged, the feature branch will be automatically closed. The second way is to delete the feature branch explicitly by running the following git commands:

# delete branch local
git branch -d user-feature-123

# delete branch remote
git push origin --delete user-feature-123

The CodeBuild project responsible for destroying the feature resources is now triggered. You can see the project’s logs while the resources are being destroyed in CodeBuild, under Build history.

Figure 5. AWS Console - CodeBuild projects.

Figure 5. AWS Console – CodeBuild projects.

Cleaning up

To avoid incurring future charges, log into the AWS console of the different accounts you used, go to the AWS CloudFormation console of the Region(s) where you chose to deploy, and select and click Delete on the main and branch stacks.

Conclusion

This post showed how you can work with an event-driven strategy and AWS CDK to implement a multi-branch pipeline flow using AWS CDK Pipelines. The described solutions leverage Lambda and CodeBuild to provide a dynamic orchestration of resources for multiple branches and pipelines.
For more information on CDK Pipelines and all the ways it can be used, see the CDK Pipelines reference documentation.

About the authors:

Iris Kraja

Iris is a Cloud Application Architect at AWS Professional Services based in New York City. She is passionate about helping customers design and build modern AWS cloud native solutions, with a keen interest in serverless technology, event-driven architectures and DevOps.  Outside of work, she enjoys hiking and spending as much time as possible in nature.

Jan Bauer

Jan is a Cloud Application Architect at AWS Professional Services. His interests are serverless computing, machine learning, and everything that involves cloud computing.

Rolando Santamaria Maso

Rolando is a senior cloud application development consultant at AWS Professional Services, based in Germany. He helps customers migrate and modernize workloads in the AWS Cloud, with a special focus on modern application architectures and development best practices, but he also creates IaC using AWS CDK. Outside work, he maintains open-source projects and enjoys spending time with family and friends.

Caroline Gluck

Caroline is an AWS Cloud application architect based in New York City, where she helps customers design and build cloud native data science applications. Caroline is a builder at heart, with a passion for serverless architecture and machine learning. In her spare time, she enjoys traveling, cooking, and spending time with family and friends.

Migrate a large data warehouse from Greenplum to Amazon Redshift using AWS SCT – Part 3

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/part-3-migrate-a-large-data-warehouse-from-greenplum-to-amazon-redshift-using-aws-sct/

In this third post of a multi-part series, we explore some of the edge cases in migrating a large data warehouse from Greenplum to Amazon Redshift using AWS Schema Conversion Tool (AWS SCT) and how to handle these challenges. Challenges include how best to use virtual partitioning, edge cases for numeric and character fields, and arrays.

You can check out the first post of this series for guidance on planning, running, and validating the migration. You can also check out the second post for best practices for choosing the optimal Amazon Redshift cluster, data architecture, converting stored procedures, compatible functions and queries widely used for SQL conversions, and recommendations for optimizing the length of data types for table columns.

Unbounded character data type

Greenplum supports creating columns as text and varchar without specifying the length of the field. This works without an issue in Greenplum but doesn’t work well in migrating to Amazon Redshift. Amazon Redshift stores data in columnar format and gets better compression when using shorter column lengths. Therefore, the Amazon Redshift best practice is to use the smallest character length possible.

AWS SCT will convert these unbounded fields as large objects (LOBs) instead of treating the columns as character fields with a specified length. LOBs are implemented differently in each database product on the market, but in general, a LOB is not stored with the rest of the table data. Instead, there is a pointer to the location of the data. When the LOB is queried, the database reconstitutes the data automatically for you, but this typically requires more resources.

Amazon Redshift doesn’t support LOBs, so AWS SCT resolves this by loading the data into Amazon Simple Storage Service (Amazon S3) and in the column, it stores the S3 URL. When you need to retrieve this data, you have to query the table, get the S3 URL, and then fetch the data from Amazon S3. This isn’t ideal because most of the time, the actual maximum length of the field doesn’t require it to be treated as a LOB, and storing the data remotely means it will take much longer to fetch the data for queries.

The current resolution is to calculate the maximum length of these columns and update the Greenplum tables before converting to Amazon Redshift with AWS SCT.

Note that in a future release of AWS SCT, the collection of statistics will include calculating the maximum length for each column, and the conversion of unbounded varchar and text will set the length in Amazon Redshift automatically.

The following code is an example of an unbounded character data type:

CREATE TABLE public.mytable 
(description1 text NOT NULL PRIMARY KEY, description2 varchar);
CREATE INDEX ON public.mytable (description2);

This table uses a primary key column on an unbounded text column. This needs to be converted to varchar(n), where n is the maximum length found in this column.

  1. Drop unique constraints on affected columns:
    ALTER TABLE public.mytable DROP CONSTRAINT mytable_pkey;

  2. Drop indexes on affected columns:
    DROP INDEX public.mytable_description2_idx;

  3. Calculate maximum length of affected columns:
    select coalesce(max(length(description1)), 10),
    coalesce(max(length(description2)), 10) 
    from public.mytable;

Note that in this example, the description1 and description2 columns only contain NULL values, or the table doesn’t have any data in it, or the calculated length of the columns is 10.

  1. Alter the length of the affected columns:
    ALTER TABLE public.mytable ALTER COLUMN description1 TYPE varchar(10);
    ALTER TABLE public.mytable ALTER COLUMN description2 TYPE varchar(10);

You can now proceed with using AWS SCT to convert the Greenplum schema to Amazon Redshift and avoiding using LOBs to store the column values.

GitHub help

If you have many tables to update and want an automated solution, you can use the add_varchar_lengths.sh script found in the GitHub repo to fix all of the unbounded varchar and text columns in a given schema in Greenplum. The script calculates the appropriate maximum length and then alters the Greenplum tables so the varchar data type is bounded by a length.

Please note that the script also will drop any constraints or indexes on the affected columns.

Empty character data

Greenplum and Amazon Redshift support an empty string value in a field that is different from NULL. The behavior is the same between the two databases. However, AWS SCT defaults to convert empty strings to NULL. This simply needs to be disabled to avoid problems.

  1. In AWS SCT, open your project, choose Settings, Project settings, and Data migration.
  2. Scroll to the bottom and find Use empty as null value.
  3. Deselect this so that AWS SCT doesn’t convert empty strings to NULL.

NaN and Infinity numeric data type

Greenplum supports NaN and Infinity in a numeric field to represent an undefined calculation result and infinity. NaN is very uncommon because when using aggregate functions on a column with a NaN row, the result will also be NaN. Infinity is also uncommon and not useful when aggregating data. However, you may encounter these values in a Greenplum database.

Amazon Redshift doesn’t support NaN and Infinity, and AWS SCT doesn’t check for this in your data. If you do encounter this when using AWS SCT, the task will fail with a numeric conversion error.

To resolve this, it’s suggested to use NULL instead of NaN and Infinity. This allows you to aggregate data and get results other than NaN and, importantly, allow you to convert the Greenplum data to Amazon Redshift.

The following code is an example NaN numeric value:

CREATE TABLE public.mytable2 (id int NOT NULL, amount numeric NOT NULL);
INSERT INTO public.mytable2 VALUES (1, 10), (2, 'NaN'), (3, 20);
  1. Drop the NOT NULL constraint:
    ALTER TABLE public.mytable2 ALTER COLUMN amount DROP NOT NULL;

  2. Update the table:
    UPDATE public.mytable2 SET amount = NULL where amount = 'NaN';

You can now proceed with using AWS SCT to migrate the Greenplum data to Amazon Redshift.

Note that in a future release of AWS SCT, there will be an option to convert NaN and Infinity to NULL so that you won’t have to update your Greenplum data to migrate to Amazon Redshift.

Virtual partitioning on GP_SEGMENT_ID

For large tables, it’s recommended to use virtual partitioning to extract data from Greenplum. Without virtual partitioning, AWS SCT will run a single query to unload data from Greenplum. For example:

SELECT * FROM store_sales;

If this table is very large, it will take a long time to extract the data because this is a single process querying the data. With virtual partitioning, multiple queries are run in parallel so that the extraction of data is completed faster. It also makes it easier to recover if there is an issue with the task.

Virtual partitioning is very flexible, but a simple way to do this in Amazon Redshift is to utilize the Greenplum hidden column gp_segment_id. This column identifies which segment in Greenplum has the data, and each segment should have an equal number of rows. Therefore, creating partitions for each gp_segment_id is an easy way to implement virtual partitioning.

If you’re not familiar with the term segment, it’s similar to an Amazon Redshift slice.

For example:

SELECT * FROM store_sales WHERE gp_segment_id = 0;
SELECT * FROM store_sales WHERE gp_segment_id = 1;
SELECT * FROM store_sales WHERE gp_segment_id = 2;
...
  1. First, determine the number of segments in Greenplum:
    SELECT count(*) 
    FROM gp_segment_configuration 
    WHERE content >= 0 AND preferred_role = 'p';

Now you can configure AWS SCT.

  1. In AWS SCT, go to Data Migration view (other) and choose (right-click) a large table.
  2. Scroll down to Add virtual partitioning.
  3. For the partition type, choose Auto Split and change the column name to GP_SEGMENT_ID.
  4. Use 0 for Start value, the number of segments found in Step 1 as End value, and Interval of 1.

When you create a local task to load this table, the task will have a sub-task for each gp_segment_id value.

Note that in a future release of AWS SCT, there will be an option to automatically virtually partition tables based on GP_SEGMENT_ID. This option will also retrieve the number of segments automatically.

Arrays

Greenplum supports arrays such as bigint[] that are unbounded. Typically, arrays are kept relatively small in Greenplum because arrays consume more memory in Greenplum than using an alternative strategy. However, it’s possible to have a very large array in Greenplum that isn’t supported by Amazon Redshift.

AWS SCT converts a Greenplum array to varchar(65535), but if the converted array is longer than 65,535 characters, then the load will fail.

The following code is an example of a large array:

CREATE TABLE public.sales 
(sales_id int NOT NULL,
customer_id int NOT NULL,
sales_item_ids bigint[]) 
DISTRIBUTED BY (sales_id);

INSERT INTO public.sales values (1, 100, '{1,2,3}'), (2, 100, '{1,2,3}');

In this example, the sales items are stored in an array for each sales_id. If you encounter an error while loading that the length is too long to load this data into Amazon Redshift with AWS SCT, then this is the solution. It’s also a more efficient pattern to store data in both Greenplum and Amazon Redshift!

  1. Create a new sales table that has all columns from the existing sales table, but exclude the array column:
    CREATE TABLE public.sales_new 
    (sales_id int NOT NULL,
    customer_id int NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. Populate the new sales table with the existing data except for the array column:
    INSERT INTO public.sales_new (sales_id, customer_id) 
    SELECT sales_id, customer_id FROM public.sales;

We create a new table that is a cross-reference of sales IDs with the sales items. Instead of having a single row for this association, now there will be a row for each relationship.

  1. Create a new sales item table:
    CREATE TABLE public.sales_items 
    (sales_id int NOT NULL, 
    sales_item_id bigint NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. To unnest the array, create a row for each array element:
    INSERT INTO public.sales_items 
    (sales_id, sales_item_id) 
    SELECT sales_id, unnest(sales_item_ids) 
    FROM public.sales;

  3. Rename the sales tables:
    ALTER TABLE public.sales RENAME TO sales_old;
    ALTER TABLE public.sales_new RENAME TO sales;

In AWS SCT, refresh the tables and migrate the revised sales and the new sales_items table.

The following are some example queries before and after.

Before:

SELECT s.sales_id, unnest(s.sales_item_ids) 
FROM public.sales s 
WHERE s.sales_id = 1;

After:

SELECT s.sales_id, i.sales_item_id 
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.sales_id = 1;

Before:

SELECT s.sales_id 
FROM public.sales s 
WHERE s.customer_id = 100
AND 10 = ANY(s.sales_item_ids);

After:

SELECT s.sales_id
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.customer_id = 100
AND i.sales_item_id = 10;

VACUUM ANALYZE

Greenplum, like Amazon Redshift, supports the VACUUM command, which reclaims storage space after UPDATE and DELETE commands are run on a table. Greenplum also allows you to add the ANALYZE option to run both statements with a single command.

The following code is the Greenplum command:

VACUUM ANALYZE table;

This is not very common, but you’ll see this from time to time. If you’re just inserting data into a table, there is no need to run VACUUM, but for ease of use, sometimes developers will use VACUUM ANALYZE.

The following are the Amazon Redshift commands:

VACUUM table;
ANALYZE table;

Amazon Redshift doesn’t support adding ANALYZE to the VACUUM command, so instead, this needs to be two different statements. Also note that Amazon Redshift performs VACUUM and ANALYZE automatically for you so in most cases, you can remove these commands from your scripts entirely.

DISTINCT ON query

Greenplum supports an unusual shortcut for eliminating duplicates in a table. This feature keeps the first row for each set of rows based on the order of the data being fetched. It’s easiest to understand by looking at an example:

CREATE TABLE customer 
(customer_name varchar(100) not null, 
customer_address varchar(1000) not null, 
lastupdate timestamp not null);

INSERT INTO customer VALUES
('ACME', '123 Main St', '2022-01-01'), 
('ACME', '456 Market St', '2022-05-01'), 
('ACME', '789 Broadway', '2022-08-01');

SELECT DISTINCT ON (customer_name) customer_name, customer_address 
FROM customer 
ORDER BY customer_name, lastupdate DESC;

We get the following results:

customer_name | customer_address 
---------------+------------------
 ACME          | 789 Broadway

The solution for running this in Amazon Redshift is to use the ANSI standard row_number() analytical function, as shown in the following code:

SELECT sub.customer_name, sub.customer_address 
FROM (SELECT customer_name, customer_address, row_number() over (partition by customer_name ORDER BY lastupdate DESC) AS row_number FROM customer) AS sub 
WHERE sub.row_number = 1;

Clean up

The examples in this post create tables in Greenplum. To remove these example tables, run the following commands:

DROP TABLE IF EXISTS public.mytable;
DROP TABLE IF EXISTS public.mytable2;
DROP TABLE IF EXISTS public.sales;
DROP TABLE IF EXISTS public.sales_new;
DROP TABLE IF EXISTS public.sales_items;
DROP TABLE IF EXISTS public.customer;

Conclusion

In this post, we covered some of the edge cases when migrating Greenplum to Amazon Redshift and how to handle these challenges, including easy virtual partitioning, edge cases for numeric and character fields, and arrays. This is not an exhaustive list of migrating Greenplum to Amazon Redshift, but this series should help you navigate modernizing your data platform by moving to Amazon Redshift.

For additional details, see the Amazon Redshift Getting Started Guide and the AWS SCT User Guide.


About the Authors

Jon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Nelly Susanto is a Senior Database Migration Specialist of AWS Database Migration Accelerator. She has over 10 years of technical experience focusing on migrating and replicating databases along with data warehouse workloads. She is passionate about helping customers in their cloud journey.

Suresh Patnam is a Principal BDM – GTM AI/ML Leader at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible by leveraging Data & AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Configuration driven dynamic multi-account CI/CD solution on AWS

Post Syndicated from Anshul Saxena original https://aws.amazon.com/blogs/devops/configuration-driven-dynamic-multi-account-ci-cd-solution-on-aws/

Many organizations require durable automated code delivery for their applications. They leverage multi-account continuous integration/continuous deployment (CI/CD) pipelines to deploy code and run automated tests in multiple environments before deploying to Production. In cases where the testing strategy is release specific, you must update the pipeline before every release. Traditional pipeline stages are predefined and static in nature, and once the pipeline stages are defined it’s hard to update them. In this post, we present a configuration driven dynamic CI/CD solution per repository. The pipeline state is maintained and governed by configurations stored in Amazon DynamoDB. This gives you the advantage of automatically customizing the pipeline for every release based on the testing requirements.

By following this post, you will set up a dynamic multi-account CI/CD solution. Your pipeline will deploy and test a sample pet store API application. Refer to Automating your API testing with AWS CodeBuild, AWS CodePipeline, and Postman for more details on this application. New code deployments will be delivered with custom pipeline stages based on the pipeline configuration that you create. This solution uses services such as AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, Amazon DynamoDB, AWS Lambda, and AWS Step Functions.

Solution overview

The following diagram illustrates the solution architecture:

The image represents the solution workflow, highlighting the integration of the AWS components involved.

Figure 1: Architecture Diagram

  1. Users insert/update/delete entry in the DynamoDB table.
  2. The Step Function Trigger Lambda is invoked on all modifications.
  3. The Step Function Trigger Lambda evaluates the incoming event and does the following:
    1. On insert and update, triggers the Step Function.
    2. On delete, finds the appropriate CloudFormation stack and deletes it.
  4. Steps in the Step Function are as follows:
    1. Collect Information (Pass State) – Filters the relevant information from the event, such as repositoryName and referenceName.
    2. Get Mapping Information (Backed by CodeCommit event filter Lambda) – Retrieves the mapping information from the Pipeline config stored in the DynamoDB.
    3. Deployment Configuration Exist? (Choice State) – If the StatusCode == 200, then the DynamoDB entry is found, and Initiate CloudFormation Stack step is invoked, or else StepFunction exits with Successful.
    4. Initiate CloudFormation Stack (Backed by stack create Lambda) – Constructs the CloudFormation parameters and creates/updates the dynamic pipeline based on the configuration stored in the DynamoDB via CloudFormation.

Code deliverables

The code deliverables include the following:

  1. AWS CDK app – The AWS CDK app contains the code for all the Lambdas, Step Functions, and CloudFormation templates.
  2. sample-application-repo – This directory contains the sample application repository used for deployment.
  3. automated-tests-repo– This directory contains the sample automated tests repository for testing the sample repo.

Deploying the CI/CD solution

  1. Clone this repository to your local machine.
  2. Follow the README to deploy the solution to your main CI/CD account. Upon successful deployment, the following resources should be created in the CI/CD account:
    1. A DynamoDB table
    2. Step Function
    3. Lambda Functions
  3. Navigate to the Amazon Simple Storage Service (Amazon S3) console in your main CI/CD account and search for a bucket with the name: cloudformation-template-bucket-<AWS_ACCOUNT_ID>. You should see two CloudFormation templates (templates/codepipeline.yaml and templates/childaccount.yaml) uploaded to this bucket.
  4. Run the childaccount.yaml in every target CI/CD account (Alpha, Beta, Gamma, and Prod) by going to the CloudFormation Console. Provide the main CI/CD account number as the “CentralAwsAccountId” parameter, and execute.
  5. Upon successful creation of Stack, two roles will be created in the Child Accounts:
    1. ChildAccountFormationRole
    2. ChildAccountDeployerRole

Pipeline configuration

Make an entry into devops-pipeline-table-info for the Repository name and branch combination. A sample entry can be found in sample-entry.json.

The pipeline is highly configurable, and everything can be configured through the DynamoDB entry.

The following are the top-level keys:

RepoName: Name of the repository for which AWS CodePipeline is configured.
RepoTag: Name of the branch used in CodePipeline.
BuildImage: Build image used for application AWS CodeBuild project.
BuildSpecFile: Buildspec file used in the application CodeBuild project.
DeploymentConfigurations: This key holds the deployment configurations for the pipeline. Under this key are the environment specific configurations. In our case, we’ve named our environments Alpha, Beta, Gamma, and Prod. You can configure to any name you like, but make sure that the entries in json are the same as in the codepipeline.yaml CloudFormation template. This is because there is a 1:1 mapping between them. Sub-level keys under DeploymentConfigurations are as follows:

  • EnvironmentName. This is the top-level key for environment specific configuration. In our case, it’s Alpha, Beta, Gamma, and Prod. Sub level keys under this are:
    • <Env>AwsAccountId: AWS account ID of the target environment.
    • Deploy<Env>: A key specifying whether or not the artifact should be deployed to this environment. Based on its value, the CodePipeline will have a deployment stage to this environment.
    • ManualApproval<Env>: Key representing whether or not manual approval is required before deployment. Enter your email or set to false.
    • Tests: Once again, this is a top-level key with sub-level keys. This key holds the test related information to be run on specific environments. Each test based on whether or not it will be run will add an additional step to the CodePipeline. The tests’ related information is also configurable with the ability to specify the test repository, branch name, buildspec file, and build image for testing the CodeBuild project.

Execute

  1. Make an entry into the devops-pipeline-table-info DynamoDB table in the main CI/CD account. A sample entry can be found in sample-entry.json. Make sure to replace the configuration values with appropriate values for your environment. An explanation of the values can be found in the Pipeline Configuration section above.
  2. After the entry is made in the DynamoDB table, you should see a CloudFormation stack being created. This CloudFormation stack will deploy the CodePipeline in the main CI/CD account by reading and using the entry in the DynamoDB table.

Customize the solution for different combinations such as deploying to an environment while skipping for others by updating the pipeline configurations stored in the devops-pipeline-table-info DynamoDB table. The following is the pipeline configured for the sample-application repository’s main branch.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

Figure 2: Dynamic Multi-Account CI/CD Pipeline

Clean up your dynamic multi-account CI/CD solution and related resources

To avoid ongoing charges for the resources that you created following this post, you should delete the following:

  1. The pipeline configuration stored in the DynamoDB
  2. The CloudFormation stacks deployed in the target CI/CD accounts
  3. The AWS CDK app deployed in the main CI/CD account
  4. Empty and delete the retained S3 buckets.

Conclusion

This configuration-driven CI/CD solution provides the ability to dynamically create and configure your pipelines in DynamoDB. IDEMIA, a global leader in identity technologies, adopted this approach for deploying their microservices based application across environments. This solution created by AWS Professional Services allowed them to dynamically create and configure their pipelines per repository per release. As Kunal Bajaj, Tech Lead of IDEMIA, states, “We worked with AWS pro-serve team to create a dynamic CI/CD solution using lambdas, step functions, SQS, and other native AWS services to conduct cross-account deployments to our different environments while providing us the flexibility to add tests and approvals as needed by the business.”

About the authors:

Anshul Saxena

Anshul is a Cloud Application Architect at AWS Professional Services and works with customers helping them in their cloud adoption journey. His expertise lies in DevOps, serverless architectures, and architecting and implementing cloud native solutions aligning with best practices.

Libin Roy

Libin is a Cloud Infrastructure Architect at AWS Professional Services. He enjoys working with customers to design and build cloud native solutions to accelerate their cloud journey. Outside of work, he enjoys traveling, cooking, playing sports and weight training.

How to secure your SaaS tenant data in DynamoDB with ABAC and client-side encryption

Post Syndicated from Jani Muuriaisniemi original https://aws.amazon.com/blogs/security/how-to-secure-your-saas-tenant-data-in-dynamodb-with-abac-and-client-side-encryption/

If you’re a SaaS vendor, you may need to store and process personal and sensitive data for large numbers of customers across different geographies. When processing sensitive data at scale, you have an increased responsibility to secure this data end-to-end. Client-side encryption of data, such as your customers’ contact information, provides an additional mechanism that can help you protect your customers and earn their trust.

In this blog post, we show how to implement client-side encryption of your SaaS application’s tenant data in Amazon DynamoDB with the Amazon DynamoDB Encryption Client. This is accomplished by leveraging AWS Identity and Access Management (IAM) together with AWS Key Management Service (AWS KMS) for a more secure and cost-effective isolation of the client-side encrypted data in DynamoDB, both at run-time and at rest.

Encrypting data in Amazon DynamoDB

Amazon DynamoDB supports data encryption at rest using encryption keys stored in AWS KMS. This functionality helps reduce operational burden and complexity involved in protecting sensitive data. In this post, you’ll learn about the benefits of adding client-side encryption to achieve end-to-end encryption in transit and at rest for your data, from its source to storage in DynamoDB. Client-side encryption helps ensure that your plaintext data isn’t available to any third party, including AWS.

You can use the Amazon DynamoDB Encryption Client to implement client-side encryption with DynamoDB. In the solution in this post, client-side encryption refers to the cryptographic operations that are performed on the application-side in the application’s Lambda function, before the data is sent to or retrieved from DynamoDB. The solution in this post uses the DynamoDB Encryption Client with the Direct KMS Materials Provider so that your data is encrypted by using AWS KMS. However, the underlying concept of the solution is not limited to the use of the DynamoDB Encryption Client, you can apply it to any client-side use of AWS KMS, for example using the AWS Encryption SDK.

For detailed information about using the DynamoDB Encryption Client, see the blog post How to encrypt and sign DynamoDB data in your application. This is a great place to start if you are not yet familiar with DynamoDB Encryption Client. If you are unsure about whether you should use client-side encryption, see Client-side and server-side encryption in the Amazon DynamoDB Encryption Client Developer Guide to help you with the decision.

AWS KMS encryption context

AWS KMS gives you the ability to add an additional layer of authentication for your AWS KMS API decrypt operations by using encryption context. The encryption context is one or more key-value pairs of additional data that you want associated with AWS KMS protected information.

Encryption context helps you defend against the risks of ciphertexts being tampered with, modified, or replaced — whether intentionally or unintentionally. Encryption context helps defend against both an unauthorized user replacing one ciphertext with another, as well as problems like operational events. To use encryption context, you specify associated key-value pairs on encrypt. You must provide the exact same key-value pairs in the encryption context on decrypt, or the operation will fail. Encryption context is not secret, and is not an access-control mechanism. The encryption context is a means of authenticating the data, not the caller.

The Direct KMS Materials Provider used in this blog post transparently generates a unique data key by using AWS KMS for each item stored in the DynamoDB table. It automatically sets the item’s partition key and sort key (if any) as AWS KMS encryption context key-value pairs.

The solution in this blog post relies on the partition key of each table item being defined in the encryption context. If you encrypt data with your own implementation, make sure to add your tenant ID to the encryption context in all your AWS KMS API calls.

For more information about the concept of AWS KMS encryption context, see the blog post How to Protect the Integrity of Your Encrypted Data by Using AWS Key Management Service and EncryptionContext. You can also see another example in Exercise 3 of the Busy Engineer’s Document Bucket Workshop.

Attribute-based access control for AWS

Attribute-based access control (ABAC) is an authorization strategy that defines permissions based on attributes. In AWS, these attributes are called tags. In the solution in this post, ABAC helps you create tenant-isolated access policies for your application, without the need to provision tenant specific AWS IAM roles.

If you are new to ABAC, or need a refresher on the concepts and the different isolation methods, see the blog post How to implement SaaS tenant isolation with ABAC and AWS IAM.

Solution overview

If you are a SaaS vendor expecting large numbers of tenants, it is important that your underlying architecture can cost effectively scale with minimal complexity to support the required number of tenants, without compromising on security. One way to meet these criteria is to store your tenant data in a single pooled DynamoDB table, and to encrypt the data using a single AWS KMS key.

Using a single shared KMS key to read and write encrypted data in DynamoDB for multiple tenants reduces your per-tenant costs. This may be especially relevant to manage your costs if you have users on your organization’s free tier, with no direct revenue to offset your costs.

When you use shared resources such as a single pooled DynamoDB table encrypted by using a single KMS key, you need a mechanism to help prevent cross-tenant access to the sensitive data. This is where you can use ABAC for AWS. By using ABAC, you can build an application with strong tenant isolation capabilities, while still using shared and pooled underlying resources for storing your sensitive tenant data.

You can find the solution described in this blog post in the aws-dynamodb-encrypt-with-abac GitHub repository. This solution uses ABAC combined with KMS encryption context to provide isolation of tenant data, both at rest and at run time. By using a single KMS key, the application encrypts tenant data on the client-side, and stores it in a pooled DynamoDB table, which is partitioned by a tenant ID.

Solution Architecture

Figure 1: Components of solution architecture

Figure 1: Components of solution architecture

The presented solution implements an API with a single AWS Lambda function behind an Amazon API Gateway, and implements processing for two types of requests:

  1. GET request: fetch any key-value pairs stored in the tenant data store for the given tenant ID.
  2. POST request: store the provided key-value pairs in the tenant data store for the given tenant ID, overwriting any existing data for the same tenant ID.

The application is written in Python, it uses AWS Lambda Powertools for Python, and you deploy it by using the AWS CDK.

It also uses the DynamoDB Encryption Client for Python, which includes several helper classes that mirror the AWS SDK for Python (Boto3) classes for DynamoDB. This solution uses the EncryptedResource helper class which provides Boto3 compatible get_item and put_item methods. The helper class is used together with the KMS Materials Provider to handle encryption and decryption with AWS KMS transparently for the application.

Note: This example solution provides no authentication of the caller identity. See chapter “Considerations for authentication and authorization” for further guidance.

How it works

Figure 2: Detailed architecture for storing new or updated tenant data

Figure 2: Detailed architecture for storing new or updated tenant data

As requests are made into the application’s API, they are routed by API Gateway to the application’s Lambda function (1). The Lambda function begins to run with the IAM permissions that its IAM execution role (DefaultExecutionRole) has been granted. These permissions do not grant any access to the DynamoDB table or the KMS key. In order to access these resources, the Lambda function first needs to assume the ResourceAccessRole, which does have the necessary permissions. To implement ABAC more securely in this use case, it is important that the application maintains clear separation of IAM permissions between the assumed ResourceAccessRole and the DefaultExecutionRole.

As the application assumes the ResourceAccessRole using the AssumeRole API call (2), it also sets a TenantID session tag. Session tags are key-value pairs that can be passed when you assume an IAM role in AWS Simple Token Service (AWS STS), and are a fundamental core building block of ABAC on AWS. When the session credentials (3) are used to make a subsequent request, the request context includes the aws:PrincipalTag context key, which can be used to access the session’s tags. The chapter “The ResourceAccessRole policy” describes how the aws:PrincipalTag context key is used in IAM policy condition statements to implement ABAC for this solution. Note that for demonstration purposes, this solution receives the value for the TenantID tag directly from the request URL, and it is not authenticated.

The trust policy of the ResourceAccessRole defines the principals that are allowed to assume the role, and to tag the assumed role session. Make sure to limit the principals to the least needed for your application to function. In this solution, the application Lambda function is the only trusted principal defined in the trust policy.

Next, the Lambda function prepares to encrypt or decrypt the data (4). To do so, it uses the DynamoDB Encryption Client. The KMS Materials Provider and the EncryptedResource helper class are both initialized with sessions by using the temporary credentials from the AssumeRole API call. This allows the Lambda function to access the KMS key and DynamoDB table resources, with access restricted to operations on data belonging only to the specific tenant ID.

Finally, using the EncryptedResource helper class provided by the DynamoDB Encryption Library, the data is written to and read from the DynamoDB table (5).

Considerations for authentication and authorization

The solution in this blog post intentionally does not implement authentication or authorization of the client requests. Instead, the requested tenant ID from the request URL is passed as the tenant identity. Your own applications should always authenticate and authorize tenant requests. There are multiple ways you can achieve this.

Modern web applications commonly use OpenID Connect (OIDC) for authentication, and OAuth for authorization. JSON Web Tokens (JWTs) can be used to pass the resulting authorization data from client to the application. You can validate a JWT when using AWS API Gateway with one of the following methods:

  1. When using a REST or a HTTP API, you can use a Lambda authorizer
  2. When using a HTTP API, you can use a JWT authorizer
  3. You can validate the token directly in your application code

If you write your own authorizer code, you can pick a popular open source library or you can choose the AWS provided open source library. To learn more about using a JWT authorizer, see the blog post How to secure API Gateway HTTP endpoints with JWT authorizer.

Regardless of the chosen method, you must be able to map a suitable claim from the user’s JWT, such as the subject, to the tenant ID, so that it can be used as the session tag in this solution.

The ResourceAccessRole policy

A critical part of the correct operation of ABAC in this solution is with the definition of the IAM access policy for the ResourceAccessRole. In the following policy, be sure to replace <region>, <account-id>, <table-name>, and <key-id> with your own values.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:<region>:<account-id>:table/<table-name>",
           ],
            "Condition": {
                "ForAllValues:StringEquals": {
                    "dynamodb:LeadingKeys": [
                        "${aws:PrincipalTag/TenantID}"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey",
            ],
            "Resource": "arn:aws:kms:<region>:<account-id>:key/<key-id>",
            "Condition": {
                "StringEquals": {
                    "kms:EncryptionContext:tenant_id": "${aws:PrincipalTag/TenantID}"
                }
            }
        }
    ]
}

The policy defines two access statements, both of which apply separate ABAC conditions:

  1. The first statement grants access to the DynamoDB table with the condition that the partition key of the item matches the TenantID session tag in the caller’s session.
  2. The second statement grants access to the KMS key with the condition that one of the key-value pairs in the encryption context of the API call has a key called tenant_id with a value that matches the TenantID session tag in the caller’s session.

Warning: Do not use a ForAnyValue or ForAllValues set operator with the kms:EncryptionContext single-valued condition key. These set operators can create a policy condition that does not require values you intend to require, and allows values you intend to forbid.

Deploying and testing the solution

Prerequisites

To deploy and test the solution, you need the following:

Deploying the solution

After you have the prerequisites installed, run the following steps in a command line environment to deploy the solution. Make sure that your AWS CLI is configured with your AWS account credentials. Note that standard AWS service charges apply to this solution. For more information about pricing, see the AWS Pricing page.

To deploy the solution into your AWS account

  1. Use the following command to download the source code:
    git clone https://github.com/aws-samples/aws-dynamodb-encrypt-with-abac
    cd aws-dynamodb-encrypt-with-abac

  2. (Optional) You will need an AWS CDK version compatible with the application (2.37.0) to deploy. The simplest way is to install a local copy with npm, but you can also use a globally installed version if you already have one. To install locally, use the following command to use npm to install the AWS CDK:
    npm install [email protected]

  3. Use the following commands to initialize a Python virtual environment:
    python3 -m venv demoenv
    source demoenv/bin/activate
    python3 -m pip install -r requirements.txt

  4. (Optional) If you have not used AWS CDK with this account and Region before, you first need to bootstrap the environment:
    npx cdk bootstrap

  5. Use the following command to deploy the application with the AWS CDK:
    npx cdk deploy

  6. Make note of the API endpoint URL https://<api url>/prod/ in the Outputs section of the CDK command. You will need this URL for the next steps.
    Outputs:
    DemoappStack.ApiEndpoint4F160690 = https://<api url>/prod/

Testing the solution with example API calls

With the application deployed, you can test the solution by making API calls against the API URL that you captured from the deployment output. You can start with a simple HTTP POST request to insert data for a tenant. The API expects a JSON string as the data to store, so make sure to post properly formatted JSON in the body of the request.

An example request using curl -command looks like:

curl https://<api url>/prod/tenant/<tenant-name> -X POST --data '{"email":"<[email protected]>"}'

You can then read the same data back with an HTTP GET request:

curl https://<api url>/prod/tenant/<tenant-name>

You can store and retrieve data for any number of tenants, and can store as many attributes as you like. Each time you store data for a tenant, any previously stored data is overwritten.

Additional considerations

A tenant ID is used as the DynamoDB table’s partition key in the example application in this solution. You can replace the tenant ID with another unique partition key, such as a product ID, as long as the ID is consistently used in the IAM access policy, the IAM session tag, and the KMS encryption context. In addition, while this solution does not use a sort key in the table, you can modify the application to support a sort key with only a few changes. For more information, see Working with tables and data in DynamoDB.

Clean up

To clean up the application resources that you deployed while testing the solution, in the solution’s home directory, run the command cdk destroy.

Then, if you no longer plan to deploy to this account and Region using AWS CDK, you can also use the AWS CloudFormation console to delete the bootstrap stack (CDKToolKit).

Conclusion

In this post, you learned a method for simple and cost-efficient client-side encryption for your tenant data. By using the DynamoDB Encryption Client, you were able to implement the encryption with less effort, all while using a standard Boto3 DynamoDB Table resource compatible interface.

Adding to the client-side encryption, you also learned how to apply attribute-based access control (ABAC) to your IAM access policies. You used ABAC for tenant isolation by applying conditions for both the DynamoDB table access, as well as access to the KMS key that is used for encryption of the tenant data in the DynamoDB table. By combining client-side encryption with ABAC, you have increased your data protection with multiple layers of security.

You can start experimenting today on your own by using the provided solution. If you have feedback about this post, submit comments in the Comments section below. If you have questions on the content, consider submitting them to AWS re:Post

Want more AWS Security news? Follow us on Twitter.

Jani Muuriaisniemi

Jani is a Principal Solutions Architect at Amazon Web Services based out of Helsinki, Finland. With more than 20 years of industry experience, he works as a trusted advisor with a broad range of customers across different industries and segments, helping the customers on their cloud journey.