Tag Archives: Expert (400)

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ 
    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
         { 
            “name”: “movie_id”, 
            “type”: “INTEGER” 
         },
         { 
            “name”: “title”, 
            “type”: “STRING” 
         },
         { 
            “name”: “release_year”,
            “type”: “INTEGER” 
         }
     ] 
}

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "COUNTRY",
      "type": "string"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Three ways to boost your email security and brand reputation with AWS

Post Syndicated from Michael Davie original https://aws.amazon.com/blogs/security/three-ways-to-boost-your-email-security-and-brand-reputation-with-aws/

If you own a domain that you use for email, you want to maintain the reputation and goodwill of your domain’s brand. Several industry-standard mechanisms can help prevent your domain from being used as part of a phishing attack. In this post, we’ll show you how to deploy three of these mechanisms, which visually authenticate emails sent from your domain to users and verify that emails are encrypted in transit. It can take as little as 15 minutes to deploy these mechanisms on Amazon Web Services (AWS), and the result can help to provide immediate and long-term improvements to your organization’s email security.

Phishing through email remains one of the most common ways that bad actors try to compromise computer systems. Incidents of phishing and related crimes far outnumber the incidents of other categories of internet crime, according to the most recent FBI Internet Crime Report. Phishing has consistently led to large annual financial losses in the US and globally.

Overview of BIMI, MTA-STS, and TLS reporting

An earlier post has covered how you can use Amazon Simple Email Service (Amazon SES) to send emails that align with best practices, including the IETF internet standards: Sender Policy Framework (SPF), DomainKeys Identified Mail (DKIM), and Domain-based Message Authentication, Reporting, and Conformance (DMARC). This post will show you how to build on this foundation and configure your domains to align with additional email security standards, including the following:

  • Brand Indicators for Message Identification (BIMI) – This standard allows you to associate a logo with your email domain, which some email clients will display to users in their inbox. Visit the BIMI Group’s Where is my BIMI Logo Displayed? webpage to see how logos are displayed in the user interfaces of BIMI-supporting mailbox providers; Figure 1 shows a mock-up of a typical layout that contains a logo.
  • Mail Transfer Agent Strict Transport Security (MTA-STS) – This standard helps ensure that email servers always use TLS encryption and certificate-based authentication when they send messages to your domain, to protect the confidentiality and integrity of email in transit.
  • SMTP TLS reporting – This reporting allows you to receive reports and monitor your domain’s TLS security posture, identify problems, and learn about attacks that might be occurring.
Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

Figure 1: A mock-up of how BIMI enables branded logos to be displayed in email user interfaces

These three standards require your Domain Name System (DNS) to publish specific records, for example by using Amazon Route 53, that point to web pages that have additional information. You can host this information without having to maintain a web server by storing it in Amazon Simple Storage Service (Amazon S3) and delivering it through Amazon CloudFront, secured with a certificate provisioned from AWS Certificate Manager (ACM).

Note: This AWS solution works for DKIM, BIMI, and DMARC, regardless of what you use to serve the actual email for your domains, which services you use to send email, and where you host DNS. For purposes of clarity, this post assumes that you are using Route 53 for DNS. If you use a different DNS hosting provider, you will manually configure DNS records in your existing hosting provider.

Solution architecture

The architecture for this solution is depicted in Figure 2.

Figure 2: The architecture diagram showing how the solution components interact

Figure 2: The architecture diagram showing how the solution components interact

The interaction points are as follows:

  1. The web content is stored in an S3 bucket, and CloudFront has access to this bucket through an origin access identity, a mechanism of AWS Identity and Access Management (IAM).
  2. As described in more detail in the BIMI section of this blog post, the Verified Mark Certificate is obtained from a BIMI-qualified certificate authority and stored in the S3 bucket.
  3. When an external email system receives a message claiming to be from your domain, it looks up BIMI records for your domain in DNS. As depicted in the diagram, a DNS request is sent to Route 53.
  4. To retrieve the BIMI logo image and Verified Mark Certificate, the external email system will make HTTPS requests to a URL published in the BIMI DNS record. In this solution, the URL points to the CloudFront distribution, which has a TLS certificate provisioned with ACM.

A few important warnings

Email is a complex system of interoperating technologies. It is also brittle: a typo or a missing DNS record can make the difference between whether an email is delivered or not. Pay close attention to your email server and the users of your email systems when implementing the solution in this blog post. The main indicator that something is wrong is the absence of email. Instead of seeing an error in your email server’s log, users will tell you that they’re expecting to receive an email from somewhere and it’s not arriving. Or they will tell you that they sent an email, and their recipient can’t find it.

The DNS uses a lot of caching and time-out values to improve its efficiency. That makes DNS records slow and a little unpredictable as they propagate across the internet. So keep in mind that as you monitor your systems, it can be hours or even more than a day before the DNS record changes have an effect that you can detect.

This solution uses AWS Cloud Development Kit (CDK) custom resources, which are supported by AWS Lambda functions that will be created as part of the deployment. These functions are configured to use CDK-selected runtimes, which will eventually pass out of support and require you to update them.

Prerequisites

You will need permission in an AWS account to create and configure the following resources:

  • An Amazon S3 bucket to store the files and access logs
  • A CloudFront distribution to publicly deliver the files from the S3 bucket
  • A TLS certificate in ACM
  • An origin access identity in IAM that CloudFront will use to access files in Amazon S3
  • Lambda functions, IAM roles, and IAM policies created by CDK custom resources

You might also want to enable these optional services:

  • Amazon Route 53 for setting the necessary DNS records. If your domain is hosted by another DNS provider, you will set these DNS records manually.
  • Amazon SES or an Amazon WorkMail organization with a single mailbox. You can configure either service with a subdomain (for example, [email protected]) such that the existing domain is not disrupted, or you can create new email addresses by using your existing email mailbox provider.

BIMI has some additional requirements:

  • BIMI requires an email domain to have implemented a strong DMARC policy so that recipients can be confident in the authenticity of the branded logos. Your email domain must have a DMARC policy of p=quarantine or p=reject. Additionally, the domain’s policy cannot have sp=none or pct<100.

    Note: Do not adjust the DMARC policy of your domain without careful testing, because this can disrupt mail delivery.

  • You must have your brand’s logo in Scaled Vector Graphics (SVG) format that conforms to the BIMI standard. For more information, see Creating BIMI SVG Logo Files on the BIMI Group website.
  • Purchase a Verified Mark Certificate (VMC) issued by a third-party certificate authority. This certificate attests that the logo, organization, and domain are associated with each other, based on a legal trademark registration. Many email hosting providers require this additional certificate before they will show your branded logo to their users. Others do not currently support BIMI, and others might have alternative mechanisms to determine whether to show your logo. For more information about purchasing a Verified Mark Certificate, see the BIMI Group website.

    Note: If you are not ready to purchase a VMC, you can deploy this solution and validate that BIMI is correctly configured for your domain, but your branded logo will not display to recipients at major email providers.

What gets deployed in this solution?

This solution deploys the DNS records and supporting files that are required to implement BIMI, MTA-STS, and SMTP TLS reporting for an email domain. We’ll look at the deployment in more detail in the following sections.

BIMI

BIMI is described by the Internet Engineering Task Force (IETF) as follows:

Brand Indicators for Message Identification (BIMI) permits Domain Owners to coordinate with Mail User Agents (MUAs) to display brand-specific Indicators next to properly authenticated messages. There are two aspects of BIMI coordination: a scalable mechanism for Domain Owners to publish their desired Indicators, and a mechanism for Mail Transfer Agents (MTAs) to verify the authenticity of the Indicator. This document specifies how Domain Owners communicate their desired Indicators through the BIMI Assertion Record in DNS and how that record is to be interpreted by MTAs and MUAs. MUAs and mail-receiving organizations are free to define their own policies for making use of BIMI data and for Indicator display as they see fit.

If your organization has a trademark-protected logo, you can set up BIMI to have that logo displayed to recipients in their email inboxes. This can have a positive impact on your brand and indicates to end users that your email is more trustworthy. The BIMI Group shows examples of how brand logos are displayed in user inboxes, as well as a list of known email service providers that support the display of BIMI logos.

As a domain owner, you can implement BIMI by publishing the relevant DNS records and hosting the relevant files. To have your logo displayed by most email hosting providers, you will need to purchase a Verified Mark Certificate from a BIMI-qualified certificate authority.

This solution will deploy a valid BIMI record in Route 53 (or tell you what to publish in the DNS if you’re not using Route 53) and will store your provided SVG logo and Verified Mark Certificate files in Amazon S3, to be delivered through CloudFront with a valid TLS certificate from ACM.

To support BIMI, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host:
    default._bimi.<your-domain>. The value of this record is: v=BIMI1; l=<url-of-your-logo> a=<url-of-verified-mark-certificate>. The value of <your-domain> refers to the domain that is used in the From header of messages that your organization sends.
  2. The logo and optional Verified Mark Certificate are hosted publicly at the HTTPS locations defined by <url-of-your-logo> and <url-of-verified-mark-certificate>, respectively.

MTA-STS

MTA-STS is described by the IETF in RFC 8461 as follows:

SMTP (Simple Mail Transport Protocol) MTA Strict Transport Security (MTA-STS) is a mechanism enabling mail service providers to declare their ability to receive Transport Layer Security (TLS) secure SMTP connections and to specify whether sending SMTP servers should refuse to deliver to MX hosts that do not offer TLS with a trusted server certificate.

Put simply, MTA-STS helps ensure that email servers always use encryption and certificate-based authentication when sending email to your domains, so that message integrity and confidentiality are preserved while in transit across the internet. MTA-STS also helps to ensure that messages are only sent to authorized servers.

This solution will deploy a valid MTA-STS policy record in Route 53 (or tell you what value to publish in the DNS if you’re not using Route 53) and will create an MTA-STS policy document to be hosted on S3 and delivered through CloudFront with a valid TLS certificate from ACM.

To support MTA-STS, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _mta-sts.<your-domain>. The value of this record is: v=STSv1; id=<unique value used for cache invalidation>.
  2. The MTA-STS policy document is hosted at and obtained from the following location: https://mta-sts.<your-domain>/.well-known/mta-sts.txt.
  3. The value of <your-domain> in both cases is the domain that is used for routing inbound mail to your organization and is typically the same domain that is used in the From header of messages that your organization sends externally. Depending on the complexity of your organization, you might receive inbound mail for multiple domains, and you might choose to publish MTA-STS policies for each domain.

Is it ever bad to encrypt everything?

In the example MTA-STS policy file provided in the GitHub repository and explained later in this post, the MTA-STS policy mode is set to testing. This means that your email server is advertising its willingness to negotiate encrypted email connections, but it does not require TLS. Servers that want to send mail to you are allowed to connect and deliver mail even if there are problems in the TLS connection, as long as you’re in testing mode. You should expect reports when servers try to connect through TLS to your mail server and fail to do so.

Be fully prepared before you change the MTA-STS policy to enforce. After this policy is set to enforce, servers that follow the MTA-STS policy and that experience an enforceable TLS-related error when they try to connect to your mail server will not deliver mail to your mail server. This is a difficult situation to detect. You will simply stop receiving email from servers that comply with the policy. You might receive reports from them indicating what errors they encountered, but it is not guaranteed. Be sure that the email address you provide in SMTP TLS reporting (in the following section) is functional and monitored by people who can take action to fix issues. If you miss TLS failure reports, you probably won’t receive email. If the TLS certificate that you use on your email server expires, and your MTA-STS policy is set to enforce, this will become an urgent issue and will disrupt the flow of email until it is fixed.

SMTP TLS reporting

SMTP TLS reporting is described by the IETF in RFC 8460 as follows:

A number of protocols exist for establishing encrypted channels between SMTP Mail Transfer Agents (MTAs), including STARTTLS, DNS-Based Authentication of Named Entities (DANE) TLSA, and MTA Strict Transport Security (MTA-STS). These protocols can fail due to misconfiguration or active attack, leading to undelivered messages or delivery over unencrypted or unauthenticated channels. This document describes a reporting mechanism and format by which sending systems can share statistics and specific information about potential failures with recipient domains. Recipient domains can then use this information to both detect potential attacks and diagnose unintentional misconfigurations.

As you gain the security benefits of MTA-STS, SMTP TLS reporting will allow you to receive reports from other internet email providers. These reports contain information that is valuable when monitoring your TLS security posture, identifying problems, and learning about attacks that might be occurring.

This solution will deploy a valid SMTP TLS reporting record on Route 53 (or provide you with the value to publish in the DNS if you are not using Route 53).

To support SMTP TLS reporting, the solution makes the following changes to your resources:

  1. A DNS record of type TXT is published at the following host: _smtp._tls.<your-domain>. The value of this record is: v=TLSRPTv1; rua=mailto:<report-receiver-email-address>
  2. The value of <report-receiver-email-address> might be an address in your domain or in a third-party provider. Automated systems that process these reports must be capable of processing GZIP compressed files and parsing JSON.

Deploy the solution with the AWS CDK

In this section, you’ll learn how to deploy the solution to create the previously described AWS resources in your account.

  1. Clone the following GitHub repository:

    git clone https://github.com/aws-samples/serverless-mail
    cd serverless-mail/email-security-records

  2. Edit CONFIG.py to reflect your desired settings, as follows:
    1. If no Verified Mark Certificate is provided, set VMC_FILENAME = None.
    2. If your DNS zone is not hosted on Route 53, or if you do not want this app to manage Route 53 DNS records, set ROUTE_53_HOSTED = False. In this case, you will need to set TLS_CERTIFICATE_ARN to the Amazon Resource Name (ARN) of a certificate hosted on ACM in us-east-1. This certificate is used by CloudFront and must support two subdomains: mta-sts and your configured BIMI_ASSET_SUBDOMAIN.
  3. Finalize the preparation, as follows:
    1. Place your BIMI logo and Verified Mark Certificate files in the assets folder.
    2. Create an MTA-STS policy file at assets/.well-known/mta-sts.txt to reflect your mail exchange (MX) servers and policy requirements. An example file is provided at assets/.well-known/mta-sts.txt.example
  4. Deploy the solution, as follows:
    1. Open a terminal in the email-security-records folder.
    2. (Recommended) Create and activate a virtual environment by running the following commands.
      python3 -m venv .venv
      source .venv/bin/activate
    3. Install the Python requirements in your environment with the following command.
      pip install -r requirements.txt
    4. Assume a role in the target account that has the permissions outlined in the Prerequisites section of this post.

      Using AWS CDK version 2.17.0 or later, deploy the bootstrap in the target account by running the following command. To learn more, see Bootstrapping in the AWS CDK Developer Guide.
      cdk bootstrap

    5. Run the following command to synthesize the CloudFormation template. Review the output of this command to verify what will be deployed.
      cdk synth
    6. Run the following command to deploy the CloudFormation template. You will be prompted to accept the IAM changes that will be applied to your account.
      cdk deploy

      Note: If you use Route53, these records are created and activated in your DNS zones as soon as the CDK finishes deploying. As the records propagate through the DNS, they will gradually start affecting the email in the affected domains.

    7. If you’re not using Route53 and instead are using a third-party DNS provider, create the CNAME and TXT records as indicated. In this case, your email is not affected by this solution until you create the records in DNS.

Testing and troubleshooting

After you have deployed the CDK solution, you can test it to confirm that the DNS records and web resources are published correctly.

BIMI

  1. Query the BIMI DNS TXT record for your domain by using the dig or nslookup command in your terminal.

    dig +short TXT default._bimi.<your-domain.example>

    Verify the response. For example:

    "v=BIMI1; l=https://bimi-assets.<your-domain.example>/logo.svg"

  2. In your web browser, open the URL from that response (for example, https://bimi-assets.<your-domain.example>/logo.svg) to verify that the logo is available and that the HTTPS certificate is valid.
  3. The BIMI group provides a tool to validate your BIMI configuration. This tool will also validate your VMC if you have purchased one.

MTA-STS

  1. Query the MTA-STS DNS TXT record for your domain.

    dig +short TXT _mta-sts.<your-domain.example>

    The value of this record is as follows:

    v=STSv1; id=<unique value used for cache invalidation>

  2. You can load the MTA-STS policy document using your web browser. For example, https://mta-sts.<your-domain.example>/.well-known/mta-sts.txt
  3. You can also use third party tools to examine your MTA-STS configuration, such as MX Toolbox.

TLS reporting

  1. Query the TLS reporting DNS TXT record for your domain.

    dig +short TXT _smtp._tls.<your-domain.example>

    Verify the response. For example:

    "v=TLSRPTv1; rua=mailto:<your email address>"

  2. You can also use third party tools to examine your TLS reporting configuration, such as Easy DMARC.

Depending on which domains you communicate with on the internet, you will begin to see TLS reports arriving at the email address that you have defined in the TLS reporting DNS record. We recommend that you closely examine the TLS reports, and use automated analytical techniques over an extended period of time before changing the default testing value of your domain’s MTA-STS policy. Not every email provider will send TLS reports, but examining the reports in aggregate will give you a good perspective for making changes to your MTA-STS policy.

Cleanup

To remove the resources created by this solution:

  1. Open a terminal in the cdk-email-security-records folder.
  2. Assume a role in the target account with permission to delete resources.
  3. Run cdk destroy.

Note: The asset and log buckets are automatically emptied and deleted by the cdk destroy command.

Conclusion

When external systems send email to or receive email from your domains they will now query your new DNS records and will look up your domain’s BIMI, MTA-STS, and TLS reporting information from your new CloudFront distribution. By adopting the email domain security mechanisms outlined in this post, you can improve the overall security posture of your email environment, as well as the perception of your brand.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Michael Davie

Michael Davie

Michael is a Senior Industry Specialist with AWS Security Assurance. He works with our customers, their regulators, and AWS teams to help raise the bar on secure cloud adoption and usage. Michael has over 20 years of experience working in the defence, intelligence, and technology sectors in Canada and is a licensed professional engineer.

Jesse Thompson

Jesse Thompson

Jesse is an Email Deliverability Manager with the Amazon Simple Email Service team. His background is in enterprise IT development and operations, with a focus on email abuse mitigation and encouragement of authenticity practices with open standard protocols. Jesse’s favorite activity outside of technology is recreational curling.

Control access to Amazon OpenSearch Service Dashboards with attribute-based role mappings

Post Syndicated from Stefan Appel original https://aws.amazon.com/blogs/big-data/control-access-to-amazon-opensearch-service-dashboards-with-attribute-based-role-mappings/

Federated users of Amazon OpenSearch Service often need access to OpenSearch Dashboards with roles based on their user profiles. OpenSearch Service fine-grained access control maps authenticated users to OpenSearch Search roles and then evaluates permissions to determine how to handle the user’s actions. However, when an enterprise-wide identity provider (IdP) manages the users, the mapping of users to OpenSearch Service roles often needs to happen dynamically based on IdP user attributes. One option to map users is to use OpenSearch Service SAML integration and pass user group information to OpenSearch Service. Another option is Amazon Cognito role-based access control, which supports rule-based or token-based mappings. But neither approach supports arbitrary role mapping logic. For example, when you need to interpret multivalued user attributes to identify a target role.

This post shows how you can implement custom role mappings with an Amazon Cognito pre-token generation AWS Lambda trigger. For our example, we use a multivalued attribute provided over OpenID Connect (OIDC) to Amazon Cognito. We show how you are in full control of the mapping logic and process of such a multivalued attribute for AWS Identity and Access Management (IAM) role lookups. Our approach is generic for OIDC-compatible IdPs. To make this post self-contained, we use the Okta IdP as an example to walk through the setup.

Overview of solution

The provided solution intercepts the OICD-based login process to OpenSearch Dashboards with a pre-token generation Lambda function. The login to OpenSearch Dashboards with a third-party IdP and Amazon Cognito as an intermediary consists of several steps:

  1. First, the initial user request to OpenSearch Dashboard is redirected to Amazon Cognito.
  2. Amazon Cognito redirects the request to the IdP for authentication.
  3. After the user authenticates, the IdP sends the identity token (ID token) back to Amazon Cognito.
  4. Amazon Cognito invokes a Lambda function that modifies the obtained token. We use an Amazon DynamoDB table to perform role mapping lookups. The modified token now contains the IAM role mapping information.
  5. Amazon Cognito uses this role mapping information to map the user to the specified IAM role and provides the role credentials.
  6. OpenSearch Service maps the IAM role credentials to OpenSearch roles and applies fine-grained permission checks.

The following architecture outlines the login flow from a user’s perspective.

Scope of solution

On the backend, OpenSearch Dashboards integrates with an Amazon Cognito user pool and an Amazon Cognito identity pool during the authentication flow. The steps are as follows:

  1. Authenticate and get tokens.
  2. Look up the token attribute and IAM role mapping and overwrite the Amazon Cognito attribute.
  3. Exchange tokens for AWS credentials used by OpenSearch dashboards.

The following architecture shows this backend perspective to the authentication process.

Backend authentication flow

In the remainder of this post, we walk through the configurations necessary for an authentication flow in which a Lambda function implements custom role mapping logic. We provide sample Lambda code for the mapping of multivalued OIDC attributes to IAM roles based on a DynamoDB lookup table with the following structure.

OIDC Attribute Value IAM Role
["attribute_a","attribute_b"] arn:aws:iam::<aws-account-id>:role/<role-name-01>
["attribute_a","attribute_x"] arn:aws:iam::<aws-account-id>:role/<role-name-02>

The high-level steps of the solution presented in this post are as follows:

  1. Configure Amazon Cognito authentication for OpenSearch Dashboards.
  2. Add IAM roles for mappings to OpenSearch Service roles.
  3. Configure the Okta IdP.
  4. Add a third-party OIDC IdP to the Amazon Cognito user pool.
  5. Map IAM roles to OpenSearch Service roles.
  6. Create the DynamoDB attribute-role mapping table.
  7. Deploy and configure the pre-token generation Lambda function.
  8. Configure the pre-token generation Lambda trigger.
  9. Test the login to OpenSearch Dashboards.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account with an OpenSearch Service domain.
  • A third-party IdP that supports OpenID Connect and adds a multivalued attribute in the authorization token. For this post, we use attributes_array as this attribute’s name and Okta as an IdP provider. You can create an Okta Developer Edition free account to test the setup.

Configure Amazon Cognito authentication for OpenSearch Dashboards

The modification of authentication tokens requires you to configure the OpenSearch Service domain to use Amazon Cognito for authentication. For instructions, refer to Configuring Amazon Cognito authentication for OpenSearch Dashboards.

The Lambda function implements custom role mappings by setting the cognito:preferred_role claim (for more information, refer to Role-based access control). For the correct interpretation of this claim, set the Amazon Cognito identity pool to Choose role from token. The Amazon Cognito identity pool then uses the value of the cognito:preferred_role claim to select the correct IAM role. The following screenshot shows the required settings in the Amazon Cognito identity pool that is created during the configuration of Amazon Cognito authentication for OpenSearch Service.

Cognito role mapping configuration

Add IAM roles for mappings to OpenSearch roles

IAM roles used for mappings to OpenSearch roles require a trust policy so that authenticated users can assume them. The trust policy needs to reference the Amazon Cognito identity pool created during the configuration of Amazon Cognito authentication for OpenSearch Service. Create at least one IAM role with a custom trust policy. For instructions, refer to Creating a role using custom trust policies. The IAM role doesn’t require the attachment of a permission policy. For a sample trust policy, refer to Role-based access control.

Configure the Okta IdP

In this section, we describe the configuration steps to include a multivalued attribute_array attribute in the token provided by Okta. For more information, refer to Customize tokens returned from Okta with custom claims. We use the Okta UI to perform the configurations. Okta also provides an API that you can use to script and automate the setup.

The first step is adding the attributes_array attribute to the Okta user profile.

  1. Use Okta’s Profile Editor under Directory, Profile Editor.
  2. Select User (default) and then choose Add Attribute.
  3. Add an attribute with a display name and variable name attributes_array of type string array.

The following screenshot shows the Okta default user profile after the custom attribute has been added.

Okta user profile editor

  1. Next, add attributes_array attribute values to users using Okta’s user management interface under Directory, People.
  2. Select a user and choose Profile.
  3. Choose Edit and enter attribute values.

The following screenshot shows an example of attributes_array attribute values within a user profile.

Okta user attributes array

The next step is adding the attributes_array attribute to the ID token that is generated during the authentication process.

  1. On the Okta console, choose Security, API and select the default authorization server.
  2. Choose Claims and choose Add Claim to add the attributes_array attribute as part of the ID token.
  3. As the scope, enter openid and as the attribute value, enter user.attributes_array.

This references the previously created attribute in a user’s profile.

Add claim to ID token

  1. Next, create an application for the federation with Amazon Cognito. For instructions, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

The last step assigns the Okta application to Okta users.

  1. Navigate to Directory, People, select a user, and choose Assign Applications.
  2. Select the application you created in the previous step.

Add a third-party OIDC IdP to the Amazon Cognito user pool

We are implementing the role mapping based on the information provided in a multivalued OIDC attribute. The authentication token needs to include this attribute. If you followed the previously described Okta configuration, the attribute is automatically added to the ID token of a user. If you used another IdP, you might have to request the attribute explicitly. For this, add the attribute name to the Authorized scopes list of the IdP in Amazon Cognito.

For instructions on how to set up the federation between a third-party IdP and an Amazon Cognito user pool and how to request additional attributes, refer to Adding OIDC identity providers to a user pool. For a detailed walkthrough for Okta, refer to How do I set up Okta as an OpenID Connect identity provider in an Amazon Cognito user pool.

After requesting the token via OIDC, you need to map the attribute to an Amazon Cognito user pool attribute. For instructions, refer to Specifying identity provider attribute mappings for your user pool. The following screenshot shows the resulting configuration on the Amazon Cognito console.

Amazon Cognito user pool attribute mapping

Map IAM roles to OpenSearch Service roles

Upon login, OpenSearch Service maps users to an OpenSearch Service role based on the IAM role ARN set in the cognito:preferred_role claim by the pre-token generation Lambda trigger. This requires a role mapping in OpenSearch Service. To add such role mappings to IAM backend roles, refer to Mapping roles to users. The following screenshot shows a role mapping on the OpenSearch Dashboards console.

Amazon OpenSearch Service role mappings

Create the attribute-role mapping table

For this solution, we use DynamoDB to store mappings of users to IAM roles. For instructions, refer to Create a table and define a partition key named Key of type String. You need the table name in the subsequent step to configure the Lambda function.

The next step is writing the mapping information into the table. A mapping entry consists of the following attributes:

  • Key – A string that contains attribute values in comma-separated alphabetical order
  • RoleArn – A string with the IAM role ARN to which the attribute value combination should be mapped

For details on how to add data to a DynamoDB table, refer to Write data to a table using the console or AWS CLI.

For example, if the previously configured OIDC attribute attributes_array contains three values, attribute_a, attribute_b, and attribute_c, the entry in the mapping table looks like table line 1 in the following screenshot.

Amazon DynamoDB table with attribute-role mappings

Deploy and configure the pre-token generation Lambda function

A Lambda function implements the custom role mapping logic. The Lambda function receives an Amazon Cognito event as input and extracts attribute information out of it. It uses the attribute information for a lookup in a DynamoDB table and retrieves the value for cognito:preferred_role. Follow the steps in Getting started with Lambda to create a Node.js Lambda function and insert the following source code:

const AWS = require("aws-sdk");
const tableName = process.env.TABLE_NAME;
const unauthorizedRoleArn = process.env.UNAUTHORIZED_ROLE;
const userAttributeArrayName = process.env.USER_POOL_ATTRIBUTE;
const dynamodbClient = new AWS.DynamoDB({apiVersion: "2012-08-10"});
exports.lambdaHandler = handlePreTokenGenerationEvent

async function handlePreTokenGenerationEvent (event, context) {
    var sortedAttributeList = getSortedAttributeList(event);
    var lookupKey = sortedAttributeList.join(',');
    var roleArn = await lookupIAMRoleArn(lookupKey);
    appendResponseWithPreferredRole(event, roleArn);
    return event;
}

function getSortedAttributeList(event) {
    return JSON.parse(event['request']['userAttributes'][userAttributeArrayName]).sort();
}

async function lookupIAMRoleArn(key) {
    var params = {
        TableName: tableName,
        Key: {
          'Key': {S: key}
        },
        ProjectionExpression: 'RoleArn'
      };
    try {
        let item = await dynamodbClient.getItem(params).promise();
        return item['Item']['RoleArn']['S'];
    } catch (e){
        console.log(e);
        return unauthorizedRoleArn; 
    }
}

function appendResponseWithPreferredRole(event, roleArn){
    event.response = {
        'claimsOverrideDetails': {
            'groupOverrideDetails': {
                'preferredRole': roleArn
            }
        }
    };
}

The Lambda function expects three environment variables. Refer to Using AWS Lambda environment variables for instructions to add the following entries:

  • TABLE_NAME – The name of the previously created DynamoDB table. This table is used for the lookups.
  • UNAUTHORIZED_ROLE – The ARN of the IAM role that is used when no mapping is found in the lookup table.
  • USER_POOL_ATTRIBUTE – The Amazon Cognito user pool attribute used for the IAM role lookup. In our example, this attribute is named custom:attributes_array.

The following screenshot shows the final configuration.

AWS Lamba function configuration

The Lambda function needs permissions to access the DynamoDB lookup table. Set permissions as follows: attach the following policy to the Lambda execution role (for instructions, refer to Lambda execution role) and provide the Region, AWS account number, and DynamoDB table name:

{
    "Statement": [
        {
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:Scan",
                "dynamodb:Query",
                "dynamodb:BatchGetItem",
                "dynamodb:DescribeTable"
            ],
            "Resource": [
                "arn:aws:dynamodb:<region>:<accountid>:table/<table>",
                "arn:aws:dynamodb:<region>:<accountid>:table/<table>/index/*"
            ],
            "Effect": "Allow"
        }
    ]
}

The configuration of the Lambda function is now complete.

Configure the pre-token generation Lambda trigger

As final step, add a pre-token generation trigger to the Amazon Cognito user pool and reference the newly created Lambda function. For details, refer to Customizing user pool workflows with Lambda triggers. The following screenshot shows the configuration.

Amazon Cognito pre-token generation trigger configuration

This step completes the setup; Amazon Cognito now maps users to OpenSearch Service roles based on the values provided in an OIDC attribute.

Test the login to OpenSearch Dashboards

The following diagram shows an exemplary login flow and the corresponding screenshots for an Okta user user1 with a user profile attribute attribute_array and value: ["attribute_a", "attribute_b", "attribute_c"].

Testing of solution

Clean up

To avoid incurring future charges, delete the OpenSearch Service domain, Amazon Cognito user pool and identity pool, Lambda function, and DynamoDB table created as part of this post.

Conclusion

In this post, we demonstrated how to set up a custom mapping to OpenSearch Service roles using values provided via an OIDC attribute. We dynamically set the cognito:preferred_role claim using an Amazon Cognito pre-token generation Lambda trigger and a DynamoDB table for lookup. The solution is capable of handling dynamic multivalued user attributes, but you can extend it with further application logic that goes beyond a simple lookup. The steps in this post are a proof of concept. If you plan to develop this into a productive solution, we recommend implementing Okta and AWS security best practices.

The post highlights just one use case of how you can use Amazon Cognito support for Lambda triggers to implement custom authentication needs. If you’re interested in further details, refer to How to Use Cognito Pre-Token Generation trigger to Customize Claims In ID Tokens.


About the Authors

Portrait StefanStefan Appel is a Senior Solutions Architect at AWS. For 10+ years, he supports enterprise customers adopt cloud technologies. Before joining AWS, Stefan held positions in software architecture, product management, and IT operations departments. He began his career in research on event-based systems. In his spare time, he enjoys hiking and has walked the length of New Zealand following Te Araroa.

Portrait ModoodModood Alvi is Senior Solutions Architect at Amazon Web Services (AWS). Modood is passionate about digital transformation and is committed helping large enterprise customers across the globe accelerate their adoption of and migration to the cloud. Modood brings more than a decade of experience in software development, having held various technical roles within companies like SAP and Porsche Digital. Modood earned his Diploma in Computer Science from the University of Stuttgart.

Build highly available streams with Amazon Kinesis Data Streams

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/build-highly-available-streams-with-amazon-kinesis-data-streams/

Many use cases are moving towards a real-time data strategy due to demand for real-time insights, low-latency response times, and the ability to adapt to the changing needs of end-users. For this type of workload, you can use Amazon Kinesis Data Streams to seamlessly provision, store, write, and read data in a streaming fashion. With Kinesis Data Streams, there are no servers to manage, and you can scale your stream to handle any additional throughput as it comes in.

Kinesis Data Streams offers 99.9% availability in a single AWS Region. For even higher availability, there are several strategies to explore within the streaming layer. This post compares and contrasts different strategies for creating a highly available Kinesis data stream in case of service interruptions, delays, or outages in the primary Region of operation.

Considerations for high availability

Before we dive into our example use case, there are several considerations to keep in mind when designing a highly available Kinesis Data Streams workload that relate to the business need for a particular pipeline:

  • Recovery Time Objective (RTO) is defined by the organization. RTO is the maximum acceptable delay between the interruption of service and restoration of service. This determines what is considered an acceptable time window when service is unavailable.
  • Recovery Point Objective (RPO) is defined by the organization. RPO is the maximum acceptable amount of time since the last data recovery point. This determines what is considered an acceptable loss of data between the last recovery point and the interruption of service.

In a general sense, the lower your values for RPO and RTO, the more expensive the overall solution becomes. This is because the solution needs to account for minimizing both data loss and service unavailability by having multiple instances of the service up and running in multiple Regions. This is why a big piece of high availability is the replication of data flowing through a workload. In our case, the data is replicated across Regions of Kinesis Data Streams. Conversely, the higher the RPO and RTO values are, the more complexity you introduce into your failover mechanism. This is due to the fact that the cost savings you realize by not standing up multiple instances across multiple Regions are offset by the orchestration needed to spin up these instances in the event of an outage.

In this post, we are only covering failover of a Kinesis data stream. In use cases where higher availability is required across the entire data pipeline, having failover architectures for every component (Amazon API Gateway, AWS Lambda, Amazon DynamoDB) is strongly encouraged.

The simplest approach to high availability is to start a new instance of producers, consumers, and data streams in a new Region upon service unavailability detection. The benefit here is primarily cost, but your RPO and RTO values will be higher as a result.

We cover the following strategies for highly available Kinesis Data Streams:

  • Warm standy – An architecture in which there is active replication of data from the Kinesis data stream in Region A to Region B. Consumers of the data stream are running in both Regions at all times. Recommended for use cases that can’t withstand extended downtime past their replication lag.
  • Cold standby – Active replication of data from the data stream in Region A to Region B, but consumers of the data stream in Region B are spun up when an outage in Region A is detected. Recommended for use cases that can afford some downtime as infrastructure is spun up in secondary Region. In this scenario, RPO will be similar to the warm standby strategy; however, RTO will increase.

For high availability purposes, these use cases need to replicate the data across Regions in a way that allows consumers and producers of the data stream to fail over quickly upon detection of a service unavailability and utilize the secondary Region’s stream. Let’s take an example architecture to further explain these DR strategies. We use API Gateway and Lambda to publish stock ticker information to a Kinesis data stream. The data is then retrieved by another Lambda consumer to save durably into DynamoDB for querying, alerting, and reporting. The following diagram illustrates this architecture.

the primary architecture for the post--showcasing data coming from a mobile phone to API Gateway, then AWS Lambda, then Kinesis Data Streams, Lambda again and finally publishing to a DynamoDB Table

We use this architecture with an example use case requiring the streaming workload to be highly available in the event of a Region outage. The customer can withstand an RTO of 15 minutes during an outage, because they refresh end-users’ dashboards on a 15-minute interval. The customer is sensitive to downtime and data loss, because their data will be used for historical auditing purposes, operational metrics, and dashboards for end-users. Downtime for this customer means that data isn’t able to be persisted in their database from their streaming layer, and therefore unavailable to any consuming application. For this use case, data can be retried up to 5 minutes from our Lambda function before failing over to the new Region. Consumers are considered unavailable when the stream is unavailable, and can scale up in the secondary Region to account for any backlog of events.

How might we approach making a Kinesis data stream highly available for this use case?

Warm standby pattern

The following architecture diagram illustrates the warm standby high availability pattern for Kinesis Data Streams.

warm standby pattern showcasing data being replicated between a kinesis data stream in one region to another

image showcasing the warm standby failover--where data from first lambda begins replicating to secondary region KDA

The warm standby architectural pattern involves running a Kinesis data stream both in the primary and secondary Region, along with consumers and downstream destinations of the primary Region’s streaming layer being replicated as well. Sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. We dive into details of how to achieve this in the client failover section of this post. Data is replicated across Regions from the data stream in the primary Region to the secondary Region. This is done instead of having the sources publish to both Regions to avoid any consistency issues between the streams in the two Regions.

Although this architectural pattern gives very high availability, it’s also the most expensive option because we’re duplicating virtually the entire streaming layer across two Regions. For business use cases that can’t withstand extended data loss or withstand downtime, this may be the best option for them. From an RTO perspective, this architectural pattern ensures there will be no downtime. There is some nuance in the RPO metric in that it depends heavily on the replication lag. In the event of the primary stream becoming unavailable, whatever data hasn’t yet been replicated may be unavailable in the secondary Region. This data won’t be considered lost, but may be unavailable for consumption until the primary stream becomes available again. This method also can result in events being out of order.

For business needs that can’t tolerate this level of record unavailability, consider retaining data on the producer for the purposes of publishing to an available stream when available, or rewinding against the source for the producer if possible so that data stuck in the primary Region can be resent to the secondary stream upon failover. We cover this consideration in the client failover section of this post.

Cold standby pattern

The following architecture diagram illustrates the cold standby high availability pattern for Kinesis Data Streams.

active passive pattern for kinesis data streams

The cold standby architectural pattern involves running a data stream both in the primary and secondary Region, and spinning up the downstream resources like a stream consumer and destination for streams when a service interruption is detected—passive mode. Just like the warm standby pattern, sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. Likewise, data is replicated across Regions from the data stream in the primary Region to the secondary Region.

The primary benefit this architectural pattern provides is cost efficiency. By not running consumers at all times, this effectively reduces your costs significantly compared to the warm standby pattern. However, this pattern may introduce some data unavailability for downstream systems while the secondary Region infrastructure is provisioned. Additionally, depending on replication lag, some records may be unavailable, as discussed in the warm standby pattern. It should be noted that depending on how long it takes to spin up resources, it may take some time for consumers to reprocess the data in the secondary Region, and latency can be introduced when failing over. Our implementation assumes a minimal replication lag and that downstream systems have the ability to reprocess a configurable amount of data to catch up to the tip of the stream. We discuss approaches to spinning these resources up in the client failover section, but one possible approach to this would be using an AWS CloudFormation template that spins these resources up on service unavailability detection.

For business needs that can tolerate some level of data unavailability and can accept interruptions while the new infrastructure in the secondary Region is spun up, this is an option to consider both from a cost perspective and an RPO/RTO perspective. The complexity of spinning up resources upon detecting service unavailability is offset by the lower cost of the overall solution.

Which pattern makes sense for our use case?

Let’s revisit the use case described earlier to identify which of the strategies best meets our needs. We can extract the pieces of information from the customer’s problem statement to identify that they need a high availability architecture that:

  • Can’t withstand extended amounts of data loss
  • Must resume operations within 15 minutes of service interruption identification

This criterion tells us that their RPO is close to zero, and their RTO is 15 minutes. From here, we can determine that the cold standby architecture with data replication provides us limited data loss, and the maximum downtime will be determined by the time it takes to provision consumers and downstream destinations in the secondary Region.

Let’s dive deeper into the implementation details of each of the core phases of high availability, including an implementation guide for our use case.

Launch AWS CloudFormation resources

If you want to follow along with our code samples, you can launch the following CloudFormation stack and follow the instructions in order to simulate the cold standby architecture referenced in this post.

Launch Stack

For purposes of the Kinesis Data Streams high availability setup demo, we use us-west-2 as the primary Region and us-east-2 as the failover Region. While deploying this solution in your own account, you can choose your own primary and failover Regions.

  1. Deploy the supplied CloudFormation template in failover Region us-east-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

  1. Deploy the supplied CloudFormation template in primary Region us-west-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

In steps 1 and 2, we deployed the following resources in the primary and failover Regions:

  1. KDS-HA-Stream – AWS::Kinesis::Stream (primary and failover Region)
  2. KDS-HA-ProducerLambda – AWS::Lambda::Function (primary Region)
  3. KDS-HA-ConsumerLambda – AWS::Lambda::Function (primary and failover Region)
  4. KDS-HA-ReplicationAgentLambda – AWS::Lambda::Function (primary Region)
  5. KDS-HA-FailoverLambda – AWS::Lambda::Function (primary Region)
  6. ticker-prices – AWS::DynamoDB::GlobalTable (primary and failover Region)

The KDS-HA-Stream Kinesis data stream is deployed in both Regions. An enhanced fan-out consumer of the KDS-HA-Stream stream KDS-HA-ReplicationAgentLambda in the primary Region is responsible for replicating messages to the data stream in the failover Region.

KDS-HA-ConsumerLambda is a Lambda function consuming messages out of the KDS-HA-Stream stream and persisting data into a DynamoDB table after preprocessing.

You can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that last_updated_region attribute shows us-west-2 as its value because it’s the primary Region.

Replication

When deciding how to replicate data from a data stream in Region A to a data stream in Region B, there are several strategies that involve a consumer reading data off of the primary stream and sending that data cross-Region to the secondary data stream. This would act as a replicator service, responsible for copying the data between the two streams, maintaining a relatively low latency to replicate and ensuring data isn’t lost during this replication.

Because replication off of a shared throughput data stream could impact the flow of data in a production workload, we recommend using the enhanced fan-out feature of Kinesis Data Streams consumers to ensure replication doesn’t have an impact on consumption latency.

The replication strategy implemented in this post features asynchronous replication, meaning that the replication process doesn’t block any standard data flow in the primary stream. Synchronous replication would be a safer approach to guarantee replication and avoid data loss; however, this isn’t possible without a service-side implementation.

The following image shows a timeline of data flow for the cold standby architecture, with data being replicated as soon as it’s published.

Lambda replication

Lambda can treat a Kinesis data stream as an event source, which will funnel events from your data stream into a Lambda function. This Lambda function then receives and forwards these events across Regions to your data stream in a secondary Region. Lambda functions allow you to utilize best streaming practices such as retries of records that encounter errors, bisect on error functionality, and using the Lambda parallelization factor; using more instances of your Lambda function than you have available shards can help process records faster.

This Lambda function is at the crux of the architecture for high availability; it’s responsible solely for sending data across Regions, and it also has the best capability to monitor the replication progress. Important metrics to monitor for Lambda replication include IteratorAge, which indicates how old the last record in the batch was when it finished processing. A high IteratorAge value indicates that the Lambda function is falling behind and therefore is not keeping up with data ingestion for replication purposes. A high IteratorAge can lead to a higher RPO and the higher likelihood of data unavailability when a passive failover happens.

We use the following sample Lambda function in our CloudFormation template to replicate data across Regions:

import json
import boto3
import random
import os
import base64


def lambda_handler(event, context):
    client = boto3.client("kinesis", region_name=os.environ["FAILOVER_REGION"])
    records = []

    for record in event["Records"]:
        records.append(
            {
                "PartitionKey": record["kinesis"]["partitionKey"],
                "Data": base64.b64decode(record["kinesis"]["data"]).decode("utf-8"),
            }
        )
    response = client.put_records(Records=records, StreamName="KDS-HA-Stream")
    if response["FailedRecordCount"] > 0:
        print("Failed replicating data: " + json.dumps(response))
        raise Exception("Failed replicating data!")

The Lambda replicator in the CloudFormation template is configured to read from the data stream in the primary Region.

The following code contains the necessary AWS Identity and Access Management (IAM) permissions for Lambda, giving access for the Lambda function to assume this role. All actions are permitted on data streams and DynamoDB. In the principal of least privilege, it’s recommended to restrict this to the necessary streams in a production environment.

      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:DescribeStreamSummary'
                  - 'kinesis:GetRecords'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:ListShards'
                  - 'kinesis:ListStreams'
                  - 'kinesis:SubscribeToShard'
                  - 'kinesis:PutRecords'
                Resource:
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream'
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream/consumer/KDS-HA-Stream-EFO-Consumer:*'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'
 

Health check

A generalized strategy for determining when to consider our data stream unavailable involves the use of Amazon CloudWatch metrics. We use the metrics coming off of our Lambda producer and consumer in order to assess the availability of our data stream. While producing to a data stream, an error might appear as one of the following responses back from the data stream: PutRecord or PutRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503 error. When consuming from a data stream, an error might appear as one of the following responses back from the data stream: SubscribeToShard.Success or GetRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503.

We can calculate our effective error rate based on PutRecord.Success and GetRecord.Success. An average error rate of 1% or higher over a time window of 5 minutes, for example, could indicate that there is an issue with the data stream, and we may want to fail over. In our CloudFormation template, this error rate threshold as well as time window are configurable, but by default we check for an error rate of 1% in the last 5 minutes to trigger a failover of our clients.

Client failover

When a data stream is deemed to be unreachable, we must now take action to keep our system available and reachable for clients on both ends of the interaction. This means for producers following the cold standby high availability architecture, we change the destination stream where the producer was writing. If high availability and failover of data producers and consumers isn’t a requirement of a given use case, a different architecture would be a better fit.

Prior to failover, the producer may have been delivering data to a stream in Region A, but we now automatically update the destination to be the stream in Region B. For different clients, the methodology of updating the producer will be different, but for ours, we store the active destination for producers in the Lambda environment variables from AWS CloudFormation and update our Lambda functions dynamically on health check failure scenarios.

For our use case, we use the maximum consumer lag time (iteratorAge) plus some buffer to influence the starting position of the failover consumer. This allows us to ensure that the consumer in the secondary Region doesn’t skip records that haven’t been processed in the originating Region, but some data overlap may occur. Note that some duplicates in the downstream system may be introduced, and having an idempotent sink or some method of handling duplicates must be implemented in order to avoid duplicate-related isssues.

In the case where data is successfully written to a data stream but is unable to be consumed from the stream, the data will not be replicated and therefore be unavailable in the second Region. The data will be durably stored in the primary data stream until it comes back online and can be read from. Note that if the stream is unavailable for a longer period of time than your total data retention period on the data stream, this data will be lost. Data retention for Kinesis Data Streams can be retrospectively increased up to 1 year.

For consumers in a cold standby architecture, upon failure detection, the consumer will be disabled or shut down, and the same consumer instance will be spun up in the secondary Region to consume from the secondary data stream. On the consumer side, we assume that the consumer application is stateless in our provided solution. If your application requires state, you can migrate or preload the application state via Amazon Simple Storage Service (Amazon S3) or a database. For a stateless application, the most important aspect of failover is the starting position.

In the following timeline, we can see that at some point, the stream in Region A was deemed unreachable.

The consumer application in Region A was reading data at time t10, and when it fails over to the secondary Region (B), it reads starting at t5 (5 minutes before the current iteratorAgeMilliseconds). This ensures that data isn’t skipped by the consumer application. Keep in mind that there may be some overlap in records in the downstream destinations.

In the provided cold standby AWS CloudFormation example, we can manually trigger a failover with the AWS Command Line Interface (AWS CLI). In the following code, we manually fail over to us-east-2:

aws lambda invoke --function-name KDS-HA-FailoverLambda --cli-binary-format raw-in-base64-out --payload '{}' response.json --region us-west-2

After a few minutes, you can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that the last_updated_region attribute shows us-east-2 as its value because it’s failed over to the us-east-2 Region.

Failback

After an outage or service unavailability is deemed to be resolved, the next logical step is to reorient your clients back to their original operating Regions. Although it may be tempting to automate this procedure, a manual failback approach during off-business hours when minimal production disruption will take place makes more sense.

In the following images, we can visualize the timeline with which consumer applications are failed back to the original Region.

The producer switches back to the original Region, and we wait for the consumer in Region B to reach 0 lag. At this point, the consumer application in Region B is disabled, and replication to Region B is resumed. We have now returned to our normal state of processing messages as shown in the replication section of this post.

In our AWS CloudFormation setup, we perform a failback with the following steps:

  1. Re-enable the event source mapping and start consuming messages from the primary Region at the latest position:
aws lambda create-event-source-mapping --function-name KDS-HA-ConsumerLambda --batch-size 100 --event-source-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --starting-position LATEST --region us-west-2
  1. Switch the producer back to the primary Region:
aws lambda update-function-configuration --function-name KDS-HA-ProducerLambda --environment "Variables={INPUT_STREAM=KDS-HA-Stream,PRODUCING_TO_REGION=us-west-2}" --region us-west-2
  1. In the failover Region (us-east-2), wait for your data stream’s GetRecords max iterator age (in milliseconds) CloudWatch metric to report 0 as a value. We’re waiting for the consumer Lambda function to catch up with all produced messages.
  2. Stop consuming messages from the failover Region.
  3. Run the following AWS CLI command and grab the UUID from the response, which we use to delete the existing event source mapping. Make sure you’re picking event source mapping for the Lambda function KDS-HA-ConsumerLambda.
aws lambda list-event-source-mappings --region us-east-2
aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-east-2
  1. Restart the replication agent in the primary Region.
  2. Run following AWS CLI command, and capture ConsumerARN from the response:
aws kinesis list-stream-consumers --stream-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --region us-west-2
aws lambda create-event-source-mapping --function-name KDS-HA-ReplicationAgentLambda --batch-size 100 --event-source-arn {{ConsumerARN}} --starting-position LATEST --region us-west-2

When this is complete, you can observe the same data stream metrics—the number of records in and out per second, consumer lag metrics, and number of errors as described in the health check section of this post—to ensure that each of the components has resumed processing data in the original Region. We can also take note of the data landing in DynamoDB, which displays which Region data is being updated from in order to determine the success of our failback procedure.

We recommend for any streaming workload that can’t withstand extended data loss or downtime to implement some form of cross-Region high availability in the unlikely event of service unavailability. These recommendations can help you determine which pattern is right for your use case.

Clean up

To avoid incurring future charges, complete the following steps:

  1. Delete the CloudFormation stack from primary Region us-west-2.
  2. Delete the CloudFormation stack from failover Region us-east-2.
  3. List all event source mappings in primary Region us-west-2 using the aws lambda list-event-source-mappings --region us-west-2 command and note the UUIDs of the event source mappings tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions.
  4. Delete event source mappings in primary Region us-west-2 tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions using the aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-west-2 command and UUIDs noted in the previous step.

Conclusion

Building highly available Kinesis data streams across multiple Regions is multi-faceted, and all aspects of your RPO, RTO, and operational costs need to be carefully considered. The code and architecture discussed in this post is one of many different architectural patterns you can choose for your workloads, so make sure to choose the appropriate architecture based on the criteria for your specific requirements.

To learn more about Kinesis Data Streams, we have a getting started guide as well as a workshop to walk through all the integrations with Kinesis Data Streams. You can also contact your AWS Solutions Architects, who can be of assistance alongside your high availability journey.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 7 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day, and process complex machine learning algorithms in real time. At AWS, he is a Senior Streaming Specialist Solutions Architect supporting both Amazon MSK and Amazon Kinesis.

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices, and proactively helps keep customer’s AWS environments operationally healthy.

Run a popular benchmark on Amazon Redshift Serverless easily with AWS Data Exchange

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/run-a-popular-benchmark-on-amazon-redshift-serverless-easily-with-aws-data-exchange/

Amazon Redshift is a fast, easy, secure, and economical cloud data warehousing service designed for analytics. AWS announced Amazon Redshift Serverless general availability in July 2022, providing an easier experience to operate Amazon Redshift. Amazon Redshift Serverless makes it simple to run and scale analytics without having to manage your data warehouse infrastructure. Amazon Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use.

Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access external data in open file formats like CSV and Parquet stored in Amazon S3. For more information on RPU pricing, refer to Amazon Redshift pricing.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. With AWS Data Exchange for Amazon Redshift, customers can start querying, evaluating, analyzing, and joining third-party data with their own first-party data without requiring any extracting, transforming, and loading (ETL). Data providers can list and offer products containing Amazon Redshift datasets in the AWS Data Exchange catalog, granting subscribers direct, read-only access to the data stored in Amazon Redshift. This feature empowers customers to quickly query, analyze, and build applications with these third-party datasets.

TPC-DS is a commonly used benchmark for measuring the query performance of data warehouse solutions such as Amazon Redshift. The benchmark is useful in proving the query capabilities of executing simple to complex queries in a timely manner. It is also used to measure the performance of different database configurations, different concurrent workloads, and also against other database products.

This blog post walks you through the steps you’ll need to set up Amazon Redshift Serverless and run the SQL queries derived from the TPC-DS benchmark against data from the AWS Data Exchange.

Solution overview

We will get started by creating an Amazon Redshift Serverless workgroup and namespace. A namespace is a collection of database objects and users while a workgroup is a collection of compute resources. To simplify executing the benchmark queries, a Linux EC2 instance will also be deployed.

Next, a GitHub repo containing the TPC-DS derived queries will be used. The TPC-DS benchmark is frequently used for evaluating the performance and functionality of cloud data warehouses. The TPC-DS benchmark includes additional steps and requirements to be considered official, but for this blog post, we are focused on only executing the SQL SELECT queries from this benchmark.

The last component of the solution is data. The TPC-DS benchmark includes binaries for generating data, but this is time-consuming to run. We have avoided this problem by generating the data, and we have made this available freely in the AWS Data Exchange.

Automated setup: CloudFormation

BDB-2063-launch-cloudformation-stack

Click the Launch Stack link above to launch the CloudFormation stack, which will automate the deployment of resources needed for the demo. The template deploys the following resources in your default VPC:

  • Amazon Compute Cloud (Amazon EC2) instance with the latest version of Amazon Linux
  • Amazon Redshift Serverless workgroup and namespace
  • IAM role with redshift-serverless:GetWorkgroup action granted; this is attached to the EC2 instance so that a command line interface (CLI) command can run to complete the instance configuration
  • Security group with inbound port 22 (ssh) and connectivity between the EC2 instance and the Amazon Redshift Serverless workgroup
  • The GitHub repo is downloaded in the EC2 instance

Template parameters

  • Stack: CloudFormation term used to define all of the resources created by the template.
  • KeyName: This is the name of an existing key pair. If you don’t have one already, create a key pair that is used to connect by SSH to the EC2 instance. More information on key pairs.
  • SSHLocation: The CIDR mask for incoming connections to the EC2 instance. The default is 0.0.0.0/0, which means any IP address is allowed to connect by SSH to the EC2 instance, provided the client has the key pair private key. The best practice for security is to limit this to a smaller range of IP addresses. For example, you can use sites like www.whatismyip.com to get your IP address.
  • RedshiftCapacity: This is the number of RPUs for the Amazon Redshift Serverless workgroup. The default is 128 and is recommended for analyzing the larger TPC-DS datasets. You can update the RPU capacity after deployment if you like or redeploy it with a different capacity value.

Manual setup

If you choose not to use the CloudFormation template, deploy the EC2 instance and Amazon Redshift Serverless with the following instructions. The following steps are only needed if you are manually provisioning the resources rather than using the provided CloudFormation template.

Amazon Redshift setup

Here are the high-level steps to create an Amazon Redshift Serverless workgroup. You can get more detailed information from this News Blog post.

To create your workgroup, complete the following steps:

  1. On the Amazon Redshift console, navigate to the Amazon Redshift Serverless dashboard.
  2. Choose Create workgroup.
  3. For Workgroup name, enter a name.
  4. Choose Next.
  5. For Namespace, enter a unique name.
  6. Choose Next.
  7. Choose Create.

These steps create an Amazon Redshift Serverless workgroup with 128 RPUs. This is the default; you can easily adjust this up or down based on your workload and budget constraints.

Linux EC2 instance setup

  • Deploy a virtual machine in the same AWS Region as your Amazon Redshift database using the Amazon Linux 2 AMI.
  • The Amazon Linux 2 AMI (64-bit x86) with the t2.micro instance type is an inexpensive and tested configuration.
  • Add the security group configured for your Amazon Redshift database to your EC2 instance.
  • Install psql with sudo yum install postgresql.x86_64 -y
  • Download this GitHub repo.
    git clone --depth 1 https://github.com/aws-samples/redshift-benchmarks /home/ec2-user/redshift-benchmarks

  • Set the following environment variables for Amazon Redshift:
    • PGHOST: This is the endpoint for the Amazon Redshift database.
    • PGPORT: This is the port the database listens on. The Amazon Redshift default is 5439.
    • PGUSER: This is your Amazon Redshift database user.
    • PGDATABASE: This is the database name where your external schemas are created. This is NOT the database created for the data share. We suggest using the default “dev” database.

Example:

export PGUSER="awsuser" 
export PGHOST="default.01.us-east-1.redshift-serverless.amazonaws.com" 
export PGPORT="5439" 
export PGDATABASE="dev"

Configure the .pgpass file to store your database credentials. The format for the .pgpass file is: hostname:port:database:user:password

Example:

default.01.us-east-1.redshift-serverless.amazonaws.com:5439:*:user1:user1P@ss

AWS Data Exchange setup

AWS Data Exchange provides third-party data in multiple data formats, including Amazon Redshift. You can subscribe to catalog listings in multiple storage locations like Amazon S3 and Amazon Redshift data shares. We encourage you to explore the AWS Data Exchange catalog on your own because there are many datasets available that can be used to enhance your data in Amazon Redshift.

First, subscribe to the AWS Marketplace listing for TPC-DS Benchmark Data. Select the Continue to subscribe button from the AWS Data Exchange catalog listing. After you review the offer and Terms and Conditions of the data product, choose Subscribe. Note that you will need the appropriate IAM permissions to subscribe to AWS Data Exchange on Marketplace. More information can be found at AWS managed policies for AWS Data Exchange.

TPC-DS uses 24 tables in a dimensional model that simulates a decision support system. It has store, catalog, and web sales as well as store, catalog, and web returns fact tables. It also has the dimension tables to support these fact tables.

TPC-DS includes a utility to generate data for the benchmark at a given scale factor. The smallest scale factor is 1 GB (uncompressed). Most benchmark tests for cloud warehouses are run with 3–10 TB of data because the dataset is large enough to stress the system but also small enough to complete the entire test in a reasonable amount of time.

There are six database schemas provided in the TPC-DS Benchmark Data subscription with 1; 10; 100; 1,000; 3,000; and 10,000 scale factors. The scale factor refers to the uncompressed data size measured in GB. Each schema refers to a dataset with the corresponding scale factor.

Scale factor (GB) ADX schema Amazon Redshift Serverless external schema
1 tpcds1 ext_tpcds1
10 tpcds10 ext_tpcds10
100 tpcds100 ext_tpcds100
1,000 tpcds1000 ext_tpcds1000
3,000 tpcds3000 ext_tpcds3000
10,000 tpcds10000 ext_tpcds10000

The following steps will create external schemas in your Amazon Redshift Serverless database that maps to schemas found in the AWS Data Exchange.

  • Log in to the EC2 instance and create a database connection.
    psql

  • Run the following query:
    select share_name, producer_account, producer_namespace from svv_datashares;

  • Use the output of this query to run the next command:
    create database tpcds_db from datashare <share_name> of account '<producer_account>' namespace '<producer_namespace>';

  • Last, you create the external schemas in Amazon Redshift:
    create external schema ext_tpcds1 from redshift database tpcds_db schema tpcds1;
    create external schema ext_tpcds10 from redshift database tpcds_db schema tpcds10;
    create external schema ext_tpcds100 from redshift database tpcds_db schema tpcds100;
    create external schema ext_tpcds1000 from redshift database tpcds_db schema tpcds1000;
    create external schema ext_tpcds3000 from redshift database tpcds_db schema tpcds3000;
    create external schema ext_tpcds10000 from redshift database tpcds_db schema tpcds10000;

  • You can now exit psql with this command:
    \q

TPC-DS derived benchmark

The TPC-DS derived benchmark consists of 99 queries in four broad categories:

  • Reporting queries
  • Ad hoc queries
  • Iterative OLAP queries
  • Data mining queries

In addition to running the 99 queries, the benchmark tests concurrency. During the concurrency portion of the test, there are n sessions (default of 5) that run the queries. Each session runs the 99 queries with different parameters and in slightly different order. This concurrency test stresses the resources of the database and generally takes longer to complete than just a single session running the 99 queries.

Some data warehouse products are configured to optimize single-user performance, whereas others may not have the ability to manage the workload effectively. This is a great way to demonstrate the workload management and stability of Amazon Redshift.

Since the data for each scale factor is located in a different schema, running the benchmark against each scale factor requires changing the schema you are referencing. The search_path defines which schemas to search for tables when a query contains objects without a schema included. For example:

ALTER USER <username> SET search_path=ext_tpcds3000,public;

The benchmark scripts set the search_path automatically.

Note: The scripts create a schema called tpcds_reports which will store the detailed output of each step of the benchmark. Each time the scripts are run, this schema will be recreated, and the latest results will be stored. If you happen to already have a schema named tpcds_reports, these scripts will drop the schema.

Running the TPC-DS derived queries

  • Connect by SSH to the EC2 instance with your key pair.
  • Change directory:
    cd ~/redshift-benchmarks/adx-tpc-ds/

  • Optionally configure the variables for the scripts in tpcds_variables.sh

Here are the default values for the variables you can set:

  • EXT_SCHEMA="ext_tpcds3000": This is the name of the external schema created that has the TPC-DS dataset. The “3000” value means the scale factor is 3000 or 3 TB (uncompressed).
  • EXPLAIN="false": If set to false, queries will run. If set to true, queries will generate explain plans rather than actually running. Each query will be logged in the log directory. Default is false.
  • MULTI_USER_COUNT="5": 0 to 20 concurrent users will run the queries. The order of the queries was set with dsqgen. Setting to 0 will skip the multi-user test. Default is 5.
  • Run the benchmark:
    ./rollout.sh > rollout.log 2>&1 &

TPC-DS derived benchmark results

We performed a test with the 3 TB ADX TPC-DS dataset on an Amazon Redshift Serverless workgroup with 128 RPUs. Additionally, we disabled query caching so that query results aren’t cached. This allows us to measure the performance of the database as opposed to its ability to serve results from cache.

The test comprises two sections. The first will run the 99 queries serially using one user while the second section will start multiple sessions based on the configuration file you set earlier. Each session will run concurrently, and each will run all 99 queries but in a different order.

The total runtime for the single-user queries was 15 minutes and 11 seconds. As shown in the following graph, the longest-running query was query 67, with an elapsed time of only 101 seconds. The average runtime was only 9.2 seconds.

1 Session TPC-DS 3TB 128 RPUs

With five concurrent users, the runtime was 28 minutes and 35 seconds, which demonstrates how Amazon Redshift Serverless performs well for single-user and concurrent-user workloads.

5 Concurrent Sessions TPC-DS 3TB 128 RPUs

As you can see, it was pretty easy to deploy Amazon Redshift Serverless, subscribe to an AWS Data Exchange product listing, and run a fairly complex benchmark in a short amount of time.

Next steps

You can run the benchmark scripts again but with different dataset sizes or a different number of concurrent users by editing the tpcds_variables.sh file. You can also try resizing your Amazon Redshift Serverless workgroup to see the performance difference with more or fewer RPUs. You can also run individual queries to see the results firsthand.

Another thing to try is to subscribe to other AWS Data Exchange products and query this data from Amazon Redshift Serverless. Be curious and explore using Amazon Redshift Serverless and the AWS Data Exchange!

Clean up

If you deployed the resources with the automated solution, you just need to delete the stack created in CloudFormation. All resources created by the stack will be deleted automatically.

If you deployed the resources manually, you need to delete the following:

  • The Amazon Redshift database created earlier.
    • If you deployed Amazon Redshift Serverless, you will need to delete both the workgroup and the namespace.
  • The Amazon EC2 instance.

Optionally, you can unsubscribe from the TPC-DS data by going to your AWS Data Exchange Subscriptions and then turning Renewal to Off.

Conclusion

This blog post covered deploying Amazon Redshift Serverless, subscribing to an AWS Data Exchange product, and running a complex benchmark in a short amount of time. Amazon Redshift Serverless can handle high levels of concurrency with very little effort and excels in price-performance.

If you have any questions or feedback, please leave them in the comments section.


About the author

Jon RobertsJon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Unlock the power of EC2 Graviton with GitLab CI/CD and EKS Runners

Post Syndicated from Michael Fischer original https://aws.amazon.com/blogs/devops/unlock-the-power-of-ec2-graviton-with-gitlab-ci-cd-and-eks-runners/

Many AWS customers are using GitLab for their DevOps needs, including source control, and continuous integration and continuous delivery (CI/CD). Many of our customers are using GitLab SaaS (the hosted edition), while others are using GitLab Self-managed to meet their security and compliance requirements.

Customers can easily add runners to their GitLab instance to perform various CI/CD jobs. These jobs include compiling source code, building software packages or container images, performing unit and integration testing, etc.—even all the way to production deployment. For the SaaS edition, GitLab offers hosted runners, and customers can provide their own runners as well. Customers who run GitLab Self-managed must provide their own runners.

In this post, we’ll discuss how customers can maximize their CI/CD capabilities by managing their GitLab runner and executor fleet with Amazon Elastic Kubernetes Service (Amazon EKS). We’ll leverage both x86 and Graviton runners, allowing customers for the first time to build and test their applications both on x86 and on AWS Graviton, our most powerful, cost-effective, and sustainable instance family. In keeping with AWS’s philosophy of “pay only for what you use,” we’ll keep our Amazon Elastic Compute Cloud (Amazon EC2) instances as small as possible, and launch ephemeral runners on Spot instances. We’ll demonstrate building and testing a simple demo application on both architectures. Finally, we’ll build and deliver a multi-architecture container image that can run on Amazon EC2 instances or AWS Fargate, both on x86 and Graviton.

Figure 1. Managed GitLab runner architecture overview

Figure 1.  Managed GitLab runner architecture overview.

Let’s go through the components:

Runners

A runner is an application to which GitLab sends jobs that are defined in a CI/CD pipeline. The runner receives jobs from GitLab and executes them—either by itself, or by passing it to an executor (we’ll visit the executor in the next section).

In our design, we’ll be using a pair of self-hosted runners. One runner will accept jobs for the x86 CPU architecture, and the other will accept jobs for the arm64 (Graviton) CPU architecture. To help us route our jobs to the proper runner, we’ll apply some tags to each runner indicating the architecture for which it will be responsible. We’ll tag the x86 runner with x86, x86-64, and amd64, thereby reflecting the most common nicknames for the architecture, and we’ll tag the arm64 runner with arm64.

Currently, these runners must always be running so that they can receive jobs as they are created. Our runners only require a small amount of memory and CPU, so that we can run them on small EC2 instances to minimize cost. These include t4g.micro for Graviton builds, or t3.micro or t3a.micro for x86 builds.

To save money on these runners, consider purchasing a Savings Plan or Reserved Instances for them. Savings Plans and Reserved Instances can save you up to 72% over on-demand pricing, and there’s no minimum spend required to use them.

Kubernetes executors

In GitLab CI/CD, the executor’s job is to perform the actual build. The runner can create hundreds or thousands of executors as needed to meet current demand, subject to the concurrency limits that you specify. Executors are created only when needed, and they are ephemeral: once a job has finished running on an executor, the runner will terminate it.

In our design, we’ll use the Kubernetes executor that’s built into the GitLab runner. The Kubernetes executor simply schedules a new pod to run each job. Once the job completes, the pod terminates, thereby freeing the node to run other jobs.

The Kubernetes executor is highly customizable. We’ll configure each runner with a nodeSelector that makes sure that the jobs are scheduled only onto nodes that are running the specified CPU architecture. Other possible customizations include CPU and memory reservations, node and pod tolerations, service accounts, volume mounts, and much more.

Scaling worker nodes

For most customers, CI/CD jobs aren’t likely to be running all of the time. To save cost, we only want to run worker nodes when there’s a job to run.

To make this happen, we’ll turn to Karpenter. Karpenter provisions EC2 instances as soon as needed to fit newly-scheduled pods. If a new executor pod is scheduled, and there isn’t a qualified instance with enough capacity remaining on it, then Karpenter will quickly and automatically launch a new instance to fit the pod. Karpenter will also periodically scan the cluster and terminate idle nodes, thereby saving on costs. Karpenter can terminate a vacant node in as little as 30 seconds.

Karpenter can launch either Amazon EC2 on-demand or Spot instances depending on your needs. With Spot instances, you can save up to 90% over on-demand instance prices. Since CI/CD jobs often aren’t time-sensitive, Spot instances can be an excellent choice for GitLab execution pods. Karpenter will even automatically find the best Spot instance type to speed up the time it takes to launch an instance and minimize the likelihood of job interruption.

Deploying our solution

To deploy our solution, we’ll write a small application using the AWS Cloud Development Kit (AWS CDK) and the EKS Blueprints library. AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. EKS Blueprints is a library designed to make it simple to deploy complex Kubernetes resources to an Amazon EKS cluster with minimum coding.

The high-level infrastructure code – which can be found in our GitLab repo – is very simple. I’ve included comments to explain how it works.

// All CDK applications start with a new cdk.App object.
const app = new cdk.App();

// Create a new EKS cluster at v1.23. Run all non-DaemonSet pods in the 
// `kube-system` (coredns, etc.) and `karpenter` namespaces in Fargate
// so that we don't have to maintain EC2 instances for them.
const clusterProvider = new blueprints.GenericClusterProvider({
  version: KubernetesVersion.V1_23,
  fargateProfiles: {
    main: {
      selectors: [
        { namespace: 'kube-system' },
        { namespace: 'karpenter' },
      ]
    }
  },
  clusterLogging: [
    ClusterLoggingTypes.API,
    ClusterLoggingTypes.AUDIT,
    ClusterLoggingTypes.AUTHENTICATOR,
    ClusterLoggingTypes.CONTROLLER_MANAGER,
    ClusterLoggingTypes.SCHEDULER
  ]
});

// EKS Blueprints uses a Builder pattern.
blueprints.EksBlueprint.builder()
  .clusterProvider(clusterProvider) // start with the Cluster Provider
  .addOns(
    // Use the EKS add-ons that manage coredns and the VPC CNI plugin
    new blueprints.addons.CoreDnsAddOn('v1.8.7-eksbuild.3'),
    new blueprints.addons.VpcCniAddOn('v1.12.0-eksbuild.1'),
    // Install Karpenter
    new blueprints.addons.KarpenterAddOn({
      provisionerSpecs: {
        // Karpenter examines scheduled pods for the following labels
        // in their `nodeSelector` or `nodeAffinity` rules and routes
        // the pods to the node with the best fit, provisioning a new
        // node if necessary to meet the requirements.
        //
        // Allow either amd64 or arm64 nodes to be provisioned 
        'kubernetes.io/arch': ['amd64', 'arm64'],
        // Allow either Spot or On-Demand nodes to be provisioned
        'karpenter.sh/capacity-type': ['spot', 'on-demand']
      },
      // Launch instances in the VPC private subnets
      subnetTags: {
        Name: 'gitlab-runner-eks-demo/gitlab-runner-eks-demo-vpc/PrivateSubnet*'
      },
      // Apply security groups that match the following tags to the launched instances
      securityGroupTags: {
        'kubernetes.io/cluster/gitlab-runner-eks-demo': 'owned'      
      }
    }),
    // Create a pair of a new GitLab runner deployments, one running on
    // arm64 (Graviton) instance, the other on an x86_64 instance.
    // We'll show the definition of the GitLabRunner class below.
    new GitLabRunner({
      arch: CpuArch.ARM_64,
      // If you're using an on-premise GitLab installation, you'll want
      // to change the URL below.
      gitlabUrl: 'https://gitlab.com',
      // Kubernetes Secret containing the runner registration token
      // (discussed later)
      secretName: 'gitlab-runner-secret'
    }),
    new GitLabRunner({
      arch: CpuArch.X86_64,
      gitlabUrl: 'https://gitlab.com',
      secretName: 'gitlab-runner-secret'
    }),
  )
  .build(app, 
         // Stack name
         'gitlab-runner-eks-demo');
The GitLabRunner class is a HelmAddOn subclass that takes a few parameters from the top-level application:
// The location and name of the GitLab Runner Helm chart
const CHART_REPO = 'https://charts.gitlab.io';
const HELM_CHART = 'gitlab-runner';

// The default namespace for the runner
const DEFAULT_NAMESPACE = 'gitlab';

// The default Helm chart version
const DEFAULT_VERSION = '0.40.1';

export enum CpuArch {
    ARM_64 = 'arm64',
    X86_64 = 'amd64'
}

// Configuration parameters
interface GitLabRunnerProps {
    // The CPU architecture of the node on which the runner pod will reside
    arch: CpuArch
    // The GitLab API URL 
    gitlabUrl: string
    // Kubernetes Secret containing the runner registration token (discussed later)
    secretName: string
    // Optional tags for the runner. These will be added to the default list 
    // corresponding to the runner's CPU architecture.
    tags?: string[]
    // Optional Kubernetes namespace in which the runner will be installed
    namespace?: string
    // Optional Helm chart version
    chartVersion?: string
}

export class GitLabRunner extends HelmAddOn {
    private arch: CpuArch;
    private gitlabUrl: string;
    private secretName: string;
    private tags: string[] = [];

    constructor(props: GitLabRunnerProps) {
        // Invoke the superclass (HelmAddOn) constructor
        super({
            name: `gitlab-runner-${props.arch}`,
            chart: HELM_CHART,
            repository: CHART_REPO,
            namespace: props.namespace || DEFAULT_NAMESPACE,
            version: props.chartVersion || DEFAULT_VERSION,
            release: `gitlab-runner-${props.arch}`,
        });

        this.arch = props.arch;
        this.gitlabUrl = props.gitlabUrl;
        this.secretName = props.secretName;

        // Set default runner tags
        switch (this.arch) {
            case CpuArch.X86_64:
                this.tags.push('amd64', 'x86', 'x86-64', 'x86_64');
                break;
            case CpuArch.ARM_64:
                this.tags.push('arm64');
                break;
        }
        this.tags.push(...props.tags || []); // Add any custom tags
    };

    // `deploy` method required by the abstract class definition. Our implementation
    // simply installs a Helm chart to the cluster with the proper values.
    deploy(clusterInfo: ClusterInfo): void | Promise<Construct> {
        const chart = this.addHelmChart(clusterInfo, this.getValues(), true);
        return Promise.resolve(chart);
    }

    // Returns the values for the GitLab Runner Helm chart
    private getValues(): Values {
        return {
            gitlabUrl: this.gitlabUrl,
            runners: {
                config: this.runnerConfig(), // runner config.toml file, from below
                name: `demo-runner-${this.arch}`, // name as seen in GitLab UI
                tags: uniq(this.tags).join(','),
                secret: this.secretName, // see below
            },
            // Labels to constrain the nodes where this runner can be placed
            nodeSelector: {
                'kubernetes.io/arch': this.arch,
                'karpenter.sh/capacity-type': 'on-demand'
            },
            // Default pod label
            podLabels: {
                'gitlab-role': 'manager'
            },
            // Create all the necessary RBAC resources including the ServiceAccount
            rbac: {
                create: true
            },
            // Required resources (memory/CPU) for the runner pod. The runner
            // is fairly lightweight as it's a self-contained Golang app.
            resources: {
                requests: {
                    memory: '128Mi',
                    cpu: '256m'
                }
            }
        };
    }

    // This string contains the runner's `config.toml` file including the
    // Kubernetes executor's configuration. Note the nodeSelector constraints 
    // (including the use of Spot capacity and the CPU architecture).
    private runnerConfig(): string {
        return `
  [[runners]]
    [runners.kubernetes]
      namespace = "{{.Release.Namespace}}"
      image = "ubuntu:16.04"
    [runners.kubernetes.node_selector]
      "kubernetes.io/arch" = "${this.arch}"
      "kubernetes.io/os" = "linux"
      "karpenter.sh/capacity-type" = "spot"
    [runners.kubernetes.pod_labels]
      gitlab-role = "runner"
      `.trim();
    }
}

For security reasons, we store the GitLab registration token in a Kubernetes Secret – never in our source code. For additional security, we recommend encrypting Secrets using an AWS Key Management Service (AWS KMS) key that you supply by specifying the encryption configuration when you create your Amazon EKS cluster. It’s a good practice to restrict access to this Secret via Kubernetes RBAC rules.

To create the Secret, run the following command:

# These two values must match the parameters supplied to the GitLabRunner constructor
NAMESPACE=gitlab
SECRET_NAME=gitlab-runner-secret
# The value of the registration token.
TOKEN=GRxxxxxxxxxxxxxxxxxxxxxx

kubectl -n $NAMESPACE create secret generic $SECRET_NAME \
        --from-literal="runner-registration-token=$TOKEN" \
        --from-literal="runner-token="

Building a multi-architecture container image

Now that we’ve launched our GitLab runners and configured the executors, we can build and test a simple multi-architecture container image. If the tests pass, we can then upload it to our project’s GitLab container registry. Our application will be pretty simple: we’ll create a web server in Go that simply prints out “Hello World” and prints out the current architecture.

Find the source code of our sample app in our GitLab repo.

In GitLab, the CI/CD configuration lives in the .gitlab-ci.yml file at the root of the source repository. In this file, we declare a list of ordered build stages, and then we declare the specific jobs associated with each stage.

Our stages are:

  1. The build stage, in which we compile our code, produce our architecture-specific images, and upload these images to the GitLab container registry. These uploaded images are tagged with a suffix indicating the architecture on which they were built. This job uses a matrix variable to run it in parallel against two different runners – one for each supported architecture. Furthermore, rather than using docker build to produce our images, we use Kaniko to build them. This lets us build our images in an unprivileged container environment and improve the security posture considerably.
  2. The test stage, in which we test the code. As with the build stage, we use a matrix variable to run the tests in parallel in separate pods on each supported architecture.

The assembly stage, in which we create a multi-architecture image manifest from the two architecture-specific images. Then, we push the manifest into the image registry so that we can refer to it in future deployments.

Figure 2. Example CI/CD pipeline for multi-architecture images

Figure 2. Example CI/CD pipeline for multi-architecture images.

Here’s what our top-level configuration looks like:

variables:
  # These are used by the runner to configure the Kubernetes executor, and define
  # the values of spec.containers[].resources.limits.{memory,cpu} for the Pod(s).
  KUBERNETES_MEMORY_REQUEST: 1Gi
  KUBERNETES_CPU_REQUEST: 1

# List of stages for jobs, and their order of execution  
stages:    
  - build
  - test
  - create-multiarch-manifest
Here’s what our build stage job looks like. Note the matrix of variables which are set in BUILD_ARCH as the two jobs are run in parallel:
build-job:
  stage: build
  parallel:
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
  image:
    name: gcr.io/kaniko-project/executor:debug
    entrypoint: [""]
  script:
    - mkdir -p /kaniko/.docker
    # Configure authentication data for Kaniko so it can push to the
    # GitLab container registry
    - echo "{\"auths\":{\"${CI_REGISTRY}\":{\"auth\":\"$(printf "%s:%s" "${CI_REGISTRY_USER}" "${CI_REGISTRY_PASSWORD}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
    # Build the image and push to the registry. In this stage, we append the build
    # architecture as a tag suffix.
    - >-
      /kaniko/executor
      --context "${CI_PROJECT_DIR}"
      --dockerfile "${CI_PROJECT_DIR}/Dockerfile"
      --destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}-${BUILD_ARCH}"

Here’s what our test stage job looks like. This time we use the image that we just produced. Our source code is copied into the application container. Then, we can run make test-api to execute the server test suite.

build-job:
  stage: build
  parallel:
    matrix:              # This job is run twice, once on amd64 (x86), once on arm64
    - BUILD_ARCH: amd64
    - BUILD_ARCH: arm64
  tags: [$BUILD_ARCH]    # Associate the job with the appropriate runner
  image:
    # Use the image we just built
    name: "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}-${BUILD_ARCH}"
  script:
    - make test-container

Finally, here’s what our assembly stage looks like. We use Podman to build the multi-architecture manifest and push it into the image registry. Traditionally we might have used docker buildx to do this, but using Podman lets us do this work in an unprivileged container for additional security.

create-manifest-job:
  stage: create-multiarch-manifest
  tags: [arm64] 
  image: public.ecr.aws/docker/library/fedora:36
  script:
    - yum -y install podman
    - echo "${CI_REGISTRY_PASSWORD}" | podman login -u "${CI_REGISTRY_USER}" --password-stdin "${CI_REGISTRY}"
    - COMPOSITE_IMAGE=${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}
    - podman manifest create ${COMPOSITE_IMAGE}
    - >-
      for arch in arm64 amd64; do
        podman manifest add ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}-${arch};
      done
    - podman manifest inspect ${COMPOSITE_IMAGE}
    # The composite image manifest omits the architecture from the tag suffix.
    - podman manifest push ${COMPOSITE_IMAGE} docker://${COMPOSITE_IMAGE}

Trying it out

I’ve created a public test GitLab project containing the sample source code, and attached the runners to the project. We can see them at Settings > CI/CD > Runners:

Figure 3. GitLab runner configurations

Figure 3. GitLab runner configurations.

Here we can also see some pipeline executions, where some have succeeded, and others have failed.

Figure 4. GitLab sample pipeline executions

Figure 4. GitLab sample pipeline executions.

We can also see the specific jobs associated with a pipeline execution:

Figure 5. GitLab sample job executions

Figure 5. GitLab sample job executions.

Finally, here are our container images:

Figure 5. GitLab sample job executions

Figure 6. GitLab sample container registry.

Conclusion

In this post, we’ve illustrated how you can quickly and easily construct multi-architecture container images with GitLab, Amazon EKS, Karpenter, and Amazon EC2, using both x86 and Graviton instance families. We indexed on using as many managed services as possible, maximizing security, and minimizing complexity and TCO. We dove deep on multiple facets of the process, and discussed how to save up to 90% of the solution’s cost by using Spot instances for CI/CD executions.

Find the sample code, including everything shown here today, in our GitLab repository.

Building multi-architecture images will unlock the value and performance of running your applications on AWS Graviton and give you increased flexibility over compute choice. We encourage you to get started today.

About the author:

Michael Fischer

Michael Fischer is a Principal Specialist Solutions Architect at Amazon Web Services. He focuses on helping customers build more cost-effectively and sustainably with AWS Graviton. Michael has an extensive background in systems programming, monitoring, and observability. His hobbies include world travel, diving, and playing the drums.

Multi-branch pipeline management and infrastructure deployment using AWS CDK Pipelines

Post Syndicated from Iris Kraja original https://aws.amazon.com/blogs/devops/multi-branch-pipeline-management-and-infrastructure-deployment-using-aws-cdk-pipelines/

This post describes how to use the AWS CDK Pipelines module to follow a Gitflow development model using AWS Cloud Development Kit (AWS CDK). Software development teams often follow a strict branching strategy during a solutions development lifecycle. Newly-created branches commonly need their own isolated copy of infrastructure resources to develop new features.

CDK Pipelines is a construct library module for continuous delivery of AWS CDK applications. CDK Pipelines are self-updating: if you add application stages or stacks, then the pipeline automatically reconfigures itself to deploy those new stages and/or stacks.

The following solution creates a new AWS CDK Pipeline within a development account for every new branch created in the source repository (AWS CodeCommit). When a branch is deleted, the pipeline and all related resources are also destroyed from the account. This GitFlow model for infrastructure provisioning allows developers to work independently from each other, concurrently, even in the same stack of the application.

Solution overview

The following diagram provides an overview of the solution. There is one default pipeline responsible for deploying resources to the different application environments (e.g., Development, Pre-Prod, and Prod). The code is stored in CodeCommit. When new changes are pushed to the default CodeCommit repository branch, AWS CodePipeline runs the default pipeline. When the default pipeline is deployed, it creates two AWS Lambda functions.

These two Lambda functions are invoked by CodeCommit CloudWatch events when a new branch in the repository is created or deleted. The Create Lambda function uses the boto3 CodeBuild module to create an AWS CodeBuild project that builds the pipeline for the feature branch. This feature pipeline consists of a build stage and an optional update pipeline stage for itself. The Destroy Lambda function creates another CodeBuild project which cleans all of the feature branch’s resources and the feature pipeline.

Figure 1. Architecture diagram.

Figure 1. Architecture diagram.

Prerequisites

Before beginning this walkthrough, you should have the following prerequisites:

  • An AWS account
  • AWS CDK installed
  • Python3 installed
  • Jq (JSON processor) installed
  • Basic understanding of continuous integration/continuous development (CI/CD) Pipelines

Initial setup

Download the repository from GitHub:

# Command to clone the repository
git clone https://github.com/aws-samples/multi-branch-cdk-pipelines.git
cd multi-branch-cdk-pipelines

Create a new CodeCommit repository in the AWS Account and region where you want to deploy the pipeline and upload the source code from above to this repository. In the config.ini file, change the repository_name and region variables accordingly.

Make sure that you set up a fresh Python environment. Install the dependencies:

pip install -r requirements.txt

Run the initial-deploy.sh script to bootstrap the development and production environments and to deploy the default pipeline. You’ll be asked to provide the following parameters: (1) Development account ID, (2) Development account AWS profile name, (3) Production account ID, and (4) Production account AWS profile name.

sh ./initial-deploy.sh --dev_account_id <YOUR DEV ACCOUNT ID> --
dev_profile_name <YOUR DEV PROFILE NAME> --prod_account_id <YOUR PRODUCTION
ACCOUNT ID> --prod_profile_name <YOUR PRODUCTION PROFILE NAME>

Default pipeline

In the CI/CD pipeline, we set up an if condition to deploy the default branch resources only if the current branch is the default one. The default branch is retrieved programmatically from the CodeCommit repository. We deploy an Amazon Simple Storage Service (Amazon S3) Bucket and two Lambda functions. The bucket is responsible for storing the feature branches’ CodeBuild artifacts. The first Lambda function is triggered when a new branch is created in CodeCommit. The second one is triggered when a branch is deleted.

if branch == default_branch:
    
...

    # Artifact bucket for feature AWS CodeBuild projects
    artifact_bucket = Bucket(
        self,
        'BranchArtifacts',
        encryption=BucketEncryption.KMS_MANAGED,
        removal_policy=RemovalPolicy.DESTROY,
        auto_delete_objects=True
    )
...
    # AWS Lambda function triggered upon branch creation
    create_branch_func = aws_lambda.Function(
        self,
        'LambdaTriggerCreateBranch',
        runtime=aws_lambda.Runtime.PYTHON_3_8,
        function_name='LambdaTriggerCreateBranch',
        handler='create_branch.handler',
        code=aws_lambda.Code.from_asset(path.join(this_dir, 'code')),
        environment={
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix
        },
        role=iam_stack.create_branch_role)


    # AWS Lambda function triggered upon branch deletion
    destroy_branch_func = aws_lambda.Function(
        self,
        'LambdaTriggerDestroyBranch',
        runtime=aws_lambda.Runtime.PYTHON_3_8,
        function_name='LambdaTriggerDestroyBranch',
        handler='destroy_branch.handler',
        role=iam_stack.delete_branch_role,
        environment={
            "ACCOUNT_ID": dev_account_id,
            "CODE_BUILD_ROLE_ARN": iam_stack.code_build_role.role_arn,
            "ARTIFACT_BUCKET": artifact_bucket.bucket_name,
            "CODEBUILD_NAME_PREFIX": codebuild_prefix,
            "DEV_STAGE_NAME": f'{dev_stage_name}-{dev_stage.main_stack_name}'
        },
        code=aws_lambda.Code.from_asset(path.join(this_dir,
                                                  'code')))

Then, the CodeCommit repository is configured to trigger these Lambda functions based on two events:

(1) Reference created

# Configure AWS CodeCommit to trigger the Lambda function when a new branch is created
repo.on_reference_created(
    'BranchCreateTrigger',
    description="AWS CodeCommit reference created event.",
    target=aws_events_targets.LambdaFunction(create_branch_func))

(2) Reference deleted

# Configure AWS CodeCommit to trigger the Lambda function when a branch is deleted
repo.on_reference_deleted(
    'BranchDeleteTrigger',
    description="AWS CodeCommit reference deleted event.",
    target=aws_events_targets.LambdaFunction(destroy_branch_func))

Lambda functions

The two Lambda functions build and destroy application environments mapped to each feature branch. An Amazon CloudWatch event triggers the LambdaTriggerCreateBranch function whenever a new branch is created. The CodeBuild client from boto3 creates the build phase and deploys the feature pipeline.

Create function

The create function deploys a feature pipeline which consists of a build stage and an optional update pipeline stage for itself. The pipeline downloads the feature branch code from the CodeCommit repository, initiates the Build and Test action using CodeBuild, and securely saves the built artifact on the S3 bucket.

The Lambda function handler code is as follows:

def handler(event, context):
    """Lambda function handler"""
    logger.info(event)

    reference_type = event['detail']['referenceType']

    try:
        if reference_type == 'branch':
            branch = event['detail']['referenceName']
            repo_name = event['detail']['repositoryName']

            client.create_project(
                name=f'{codebuild_name_prefix}-{branch}-create',
                description="Build project to deploy branch pipeline",
                source={
                    'type': 'CODECOMMIT',
                    'location': f'https://git-codecommit.{region}.amazonaws.com/v1/repos/{repo_name}',
                    'buildspec': generate_build_spec(branch)
                },
                sourceVersion=f'refs/heads/{branch}',
                artifacts={
                    'type': 'S3',
                    'location': artifact_bucket_name,
                    'path': f'{branch}',
                    'packaging': 'NONE',
                    'artifactIdentifier': 'BranchBuildArtifact'
                },
                environment={
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'
                },
                serviceRole=role_arn
            )

            client.start_build(
                projectName=f'CodeBuild-{branch}-create'
            )
    except Exception as e:
        logger.error(e)

Create branch CodeBuild project’s buildspec.yaml content:

version: 0.2
env:
  variables:
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
phases:
  pre_build:
    commands:
      - npm install -g aws-cdk && pip install -r requirements.txt
  build:
    commands:
      - cdk synth
      - cdk deploy --require-approval=never
artifacts:
  files:
    - '**/*'

Destroy function

The second Lambda function is responsible for the destruction of a feature branch’s resources. Upon the deletion of a feature branch, an Amazon CloudWatch event triggers this Lambda function. The function creates a CodeBuild Project which destroys the feature pipeline and all of the associated resources created by that pipeline. The source property of the CodeBuild Project is the feature branch’s source code saved as an artifact in Amazon S3.

The Lambda function handler code is as follows:

def handler(event, context):
    logger.info(event)
    reference_type = event['detail']['referenceType']

    try:
        if reference_type == 'branch':
            branch = event['detail']['referenceName']
            client.create_project(
                name=f'{codebuild_name_prefix}-{branch}-destroy',
                description="Build project to destroy branch resources",
                source={
                    'type': 'S3',
                    'location': f'{artifact_bucket_name}/{branch}/CodeBuild-{branch}-create/',
                    'buildspec': generate_build_spec(branch)
                },
                artifacts={
                    'type': 'NO_ARTIFACTS'
                },
                environment={
                    'type': 'LINUX_CONTAINER',
                    'image': 'aws/codebuild/standard:4.0',
                    'computeType': 'BUILD_GENERAL1_SMALL'
                },
                serviceRole=role_arn
            )

            client.start_build(
                projectName=f'CodeBuild-{branch}-destroy'
            )

            client.delete_project(
                name=f'CodeBuild-{branch}-destroy'
            )

            client.delete_project(
                name=f'CodeBuild-{branch}-create'
            )
    except Exception as e:
        logger.error(e)

Destroy the branch CodeBuild project’s buildspec.yaml content:

version: 0.2
env:
  variables:
    BRANCH: {branch}
    DEV_ACCOUNT_ID: {account_id}
    PROD_ACCOUNT_ID: {account_id}
    REGION: {region}
phases:
  pre_build:
    commands:
      - npm install -g aws-cdk && pip install -r requirements.txt
  build:
    commands:
      - cdk destroy cdk-pipelines-multi-branch-{branch} --force
      - aws cloudformation delete-stack --stack-name {dev_stage_name}-{branch}
      - aws s3 rm s3://{artifact_bucket_name}/{branch} --recursive

Create a feature branch

On your machine’s local copy of the repository, create a new feature branch using the following git commands. Replace user-feature-123 with a unique name for your feature branch. Note that this feature branch name must comply with the CodePipeline naming restrictions, as it will be used to name a unique pipeline later in this walkthrough.

# Create the feature branch
git checkout -b user-feature-123
git push origin user-feature-123

The first Lambda function will deploy the CodeBuild project, which then deploys the feature pipeline. This can take a few minutes. You can log in to the AWS Console and see the CodeBuild project running under CodeBuild.

Figure 2. AWS Console - CodeBuild projects.

Figure 2. AWS Console – CodeBuild projects.

After the build is successfully finished, you can see the deployed feature pipeline under CodePipelines.

Figure 3. AWS Console - CodePipeline pipelines.

Figure 3. AWS Console – CodePipeline pipelines.

The Lambda S3 trigger project from AWS CDK Samples is used as the infrastructure resources to demonstrate this solution. The content is placed inside the src directory and is deployed by the pipeline. When visiting the Lambda console page, you can see two functions: one by the default pipeline and one by our feature pipeline.

Figure 4. AWS Console - Lambda functions.

Figure 4. AWS Console – Lambda functions.

Destroy a feature branch

There are two common ways for removing feature branches. The first one is related to a pull request, also known as a “PR”. This occurs when merging a feature branch back into the default branch. Once it’s merged, the feature branch will be automatically closed. The second way is to delete the feature branch explicitly by running the following git commands:

# delete branch local
git branch -d user-feature-123

# delete branch remote
git push origin --delete user-feature-123

The CodeBuild project responsible for destroying the feature resources is now triggered. You can see the project’s logs while the resources are being destroyed in CodeBuild, under Build history.

Figure 5. AWS Console - CodeBuild projects.

Figure 5. AWS Console – CodeBuild projects.

Cleaning up

To avoid incurring future charges, log into the AWS console of the different accounts you used, go to the AWS CloudFormation console of the Region(s) where you chose to deploy, and select and click Delete on the main and branch stacks.

Conclusion

This post showed how you can work with an event-driven strategy and AWS CDK to implement a multi-branch pipeline flow using AWS CDK Pipelines. The described solutions leverage Lambda and CodeBuild to provide a dynamic orchestration of resources for multiple branches and pipelines.
For more information on CDK Pipelines and all the ways it can be used, see the CDK Pipelines reference documentation.

About the authors:

Iris Kraja

Iris is a Cloud Application Architect at AWS Professional Services based in New York City. She is passionate about helping customers design and build modern AWS cloud native solutions, with a keen interest in serverless technology, event-driven architectures and DevOps.  Outside of work, she enjoys hiking and spending as much time as possible in nature.

Jan Bauer

Jan is a Cloud Application Architect at AWS Professional Services. His interests are serverless computing, machine learning, and everything that involves cloud computing.

Rolando Santamaria Maso

Rolando is a senior cloud application development consultant at AWS Professional Services, based in Germany. He helps customers migrate and modernize workloads in the AWS Cloud, with a special focus on modern application architectures and development best practices, but he also creates IaC using AWS CDK. Outside work, he maintains open-source projects and enjoys spending time with family and friends.

Caroline Gluck

Caroline is an AWS Cloud application architect based in New York City, where she helps customers design and build cloud native data science applications. Caroline is a builder at heart, with a passion for serverless architecture and machine learning. In her spare time, she enjoys traveling, cooking, and spending time with family and friends.

Migrate a large data warehouse from Greenplum to Amazon Redshift using AWS SCT – Part 3

Post Syndicated from Jon Roberts original https://aws.amazon.com/blogs/big-data/part-3-migrate-a-large-data-warehouse-from-greenplum-to-amazon-redshift-using-aws-sct/

In this third post of a multi-part series, we explore some of the edge cases in migrating a large data warehouse from Greenplum to Amazon Redshift using AWS Schema Conversion Tool (AWS SCT) and how to handle these challenges. Challenges include how best to use virtual partitioning, edge cases for numeric and character fields, and arrays.

You can check out the first post of this series for guidance on planning, running, and validating the migration. You can also check out the second post for best practices for choosing the optimal Amazon Redshift cluster, data architecture, converting stored procedures, compatible functions and queries widely used for SQL conversions, and recommendations for optimizing the length of data types for table columns.

Unbounded character data type

Greenplum supports creating columns as text and varchar without specifying the length of the field. This works without an issue in Greenplum but doesn’t work well in migrating to Amazon Redshift. Amazon Redshift stores data in columnar format and gets better compression when using shorter column lengths. Therefore, the Amazon Redshift best practice is to use the smallest character length possible.

AWS SCT will convert these unbounded fields as large objects (LOBs) instead of treating the columns as character fields with a specified length. LOBs are implemented differently in each database product on the market, but in general, a LOB is not stored with the rest of the table data. Instead, there is a pointer to the location of the data. When the LOB is queried, the database reconstitutes the data automatically for you, but this typically requires more resources.

Amazon Redshift doesn’t support LOBs, so AWS SCT resolves this by loading the data into Amazon Simple Storage Service (Amazon S3) and in the column, it stores the S3 URL. When you need to retrieve this data, you have to query the table, get the S3 URL, and then fetch the data from Amazon S3. This isn’t ideal because most of the time, the actual maximum length of the field doesn’t require it to be treated as a LOB, and storing the data remotely means it will take much longer to fetch the data for queries.

The current resolution is to calculate the maximum length of these columns and update the Greenplum tables before converting to Amazon Redshift with AWS SCT.

Note that in a future release of AWS SCT, the collection of statistics will include calculating the maximum length for each column, and the conversion of unbounded varchar and text will set the length in Amazon Redshift automatically.

The following code is an example of an unbounded character data type:

CREATE TABLE public.mytable 
(description1 text NOT NULL PRIMARY KEY, description2 varchar);
CREATE INDEX ON public.mytable (description2);

This table uses a primary key column on an unbounded text column. This needs to be converted to varchar(n), where n is the maximum length found in this column.

  1. Drop unique constraints on affected columns:
    ALTER TABLE public.mytable DROP CONSTRAINT mytable_pkey;

  2. Drop indexes on affected columns:
    DROP INDEX public.mytable_description2_idx;

  3. Calculate maximum length of affected columns:
    select coalesce(max(length(description1)), 10),
    coalesce(max(length(description2)), 10) 
    from public.mytable;

Note that in this example, the description1 and description2 columns only contain NULL values, or the table doesn’t have any data in it, or the calculated length of the columns is 10.

  1. Alter the length of the affected columns:
    ALTER TABLE public.mytable ALTER COLUMN description1 TYPE varchar(10);
    ALTER TABLE public.mytable ALTER COLUMN description2 TYPE varchar(10);

You can now proceed with using AWS SCT to convert the Greenplum schema to Amazon Redshift and avoiding using LOBs to store the column values.

GitHub help

If you have many tables to update and want an automated solution, you can use the add_varchar_lengths.sh script found in the GitHub repo to fix all of the unbounded varchar and text columns in a given schema in Greenplum. The script calculates the appropriate maximum length and then alters the Greenplum tables so the varchar data type is bounded by a length.

Please note that the script also will drop any constraints or indexes on the affected columns.

Empty character data

Greenplum and Amazon Redshift support an empty string value in a field that is different from NULL. The behavior is the same between the two databases. However, AWS SCT defaults to convert empty strings to NULL. This simply needs to be disabled to avoid problems.

  1. In AWS SCT, open your project, choose Settings, Project settings, and Data migration.
  2. Scroll to the bottom and find Use empty as null value.
  3. Deselect this so that AWS SCT doesn’t convert empty strings to NULL.

NaN and Infinity numeric data type

Greenplum supports NaN and Infinity in a numeric field to represent an undefined calculation result and infinity. NaN is very uncommon because when using aggregate functions on a column with a NaN row, the result will also be NaN. Infinity is also uncommon and not useful when aggregating data. However, you may encounter these values in a Greenplum database.

Amazon Redshift doesn’t support NaN and Infinity, and AWS SCT doesn’t check for this in your data. If you do encounter this when using AWS SCT, the task will fail with a numeric conversion error.

To resolve this, it’s suggested to use NULL instead of NaN and Infinity. This allows you to aggregate data and get results other than NaN and, importantly, allow you to convert the Greenplum data to Amazon Redshift.

The following code is an example NaN numeric value:

CREATE TABLE public.mytable2 (id int NOT NULL, amount numeric NOT NULL);
INSERT INTO public.mytable2 VALUES (1, 10), (2, 'NaN'), (3, 20);
  1. Drop the NOT NULL constraint:
    ALTER TABLE public.mytable2 ALTER COLUMN amount DROP NOT NULL;

  2. Update the table:
    UPDATE public.mytable2 SET amount = NULL where amount = 'NaN';

You can now proceed with using AWS SCT to migrate the Greenplum data to Amazon Redshift.

Note that in a future release of AWS SCT, there will be an option to convert NaN and Infinity to NULL so that you won’t have to update your Greenplum data to migrate to Amazon Redshift.

Virtual partitioning on GP_SEGMENT_ID

For large tables, it’s recommended to use virtual partitioning to extract data from Greenplum. Without virtual partitioning, AWS SCT will run a single query to unload data from Greenplum. For example:

SELECT * FROM store_sales;

If this table is very large, it will take a long time to extract the data because this is a single process querying the data. With virtual partitioning, multiple queries are run in parallel so that the extraction of data is completed faster. It also makes it easier to recover if there is an issue with the task.

Virtual partitioning is very flexible, but a simple way to do this in Amazon Redshift is to utilize the Greenplum hidden column gp_segment_id. This column identifies which segment in Greenplum has the data, and each segment should have an equal number of rows. Therefore, creating partitions for each gp_segment_id is an easy way to implement virtual partitioning.

If you’re not familiar with the term segment, it’s similar to an Amazon Redshift slice.

For example:

SELECT * FROM store_sales WHERE gp_segment_id = 0;
SELECT * FROM store_sales WHERE gp_segment_id = 1;
SELECT * FROM store_sales WHERE gp_segment_id = 2;
...
  1. First, determine the number of segments in Greenplum:
    SELECT count(*) 
    FROM gp_segment_configuration 
    WHERE content >= 0 AND preferred_role = 'p';

Now you can configure AWS SCT.

  1. In AWS SCT, go to Data Migration view (other) and choose (right-click) a large table.
  2. Scroll down to Add virtual partitioning.
  3. For the partition type, choose Auto Split and change the column name to GP_SEGMENT_ID.
  4. Use 0 for Start value, the number of segments found in Step 1 as End value, and Interval of 1.

When you create a local task to load this table, the task will have a sub-task for each gp_segment_id value.

Note that in a future release of AWS SCT, there will be an option to automatically virtually partition tables based on GP_SEGMENT_ID. This option will also retrieve the number of segments automatically.

Arrays

Greenplum supports arrays such as bigint[] that are unbounded. Typically, arrays are kept relatively small in Greenplum because arrays consume more memory in Greenplum than using an alternative strategy. However, it’s possible to have a very large array in Greenplum that isn’t supported by Amazon Redshift.

AWS SCT converts a Greenplum array to varchar(65535), but if the converted array is longer than 65,535 characters, then the load will fail.

The following code is an example of a large array:

CREATE TABLE public.sales 
(sales_id int NOT NULL,
customer_id int NOT NULL,
sales_item_ids bigint[]) 
DISTRIBUTED BY (sales_id);

INSERT INTO public.sales values (1, 100, '{1,2,3}'), (2, 100, '{1,2,3}');

In this example, the sales items are stored in an array for each sales_id. If you encounter an error while loading that the length is too long to load this data into Amazon Redshift with AWS SCT, then this is the solution. It’s also a more efficient pattern to store data in both Greenplum and Amazon Redshift!

  1. Create a new sales table that has all columns from the existing sales table, but exclude the array column:
    CREATE TABLE public.sales_new 
    (sales_id int NOT NULL,
    customer_id int NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. Populate the new sales table with the existing data except for the array column:
    INSERT INTO public.sales_new (sales_id, customer_id) 
    SELECT sales_id, customer_id FROM public.sales;

We create a new table that is a cross-reference of sales IDs with the sales items. Instead of having a single row for this association, now there will be a row for each relationship.

  1. Create a new sales item table:
    CREATE TABLE public.sales_items 
    (sales_id int NOT NULL, 
    sales_item_id bigint NOT NULL) 
    DISTRIBUTED BY (sales_id);

  2. To unnest the array, create a row for each array element:
    INSERT INTO public.sales_items 
    (sales_id, sales_item_id) 
    SELECT sales_id, unnest(sales_item_ids) 
    FROM public.sales;

  3. Rename the sales tables:
    ALTER TABLE public.sales RENAME TO sales_old;
    ALTER TABLE public.sales_new RENAME TO sales;

In AWS SCT, refresh the tables and migrate the revised sales and the new sales_items table.

The following are some example queries before and after.

Before:

SELECT s.sales_id, unnest(s.sales_item_ids) 
FROM public.sales s 
WHERE s.sales_id = 1;

After:

SELECT s.sales_id, i.sales_item_id 
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.sales_id = 1;

Before:

SELECT s.sales_id 
FROM public.sales s 
WHERE s.customer_id = 100
AND 10 = ANY(s.sales_item_ids);

After:

SELECT s.sales_id
FROM public.sales s 
JOIN public.sales_items i ON s.sales_id = i.sales_id 
WHERE s.customer_id = 100
AND i.sales_item_id = 10;

VACUUM ANALYZE

Greenplum, like Amazon Redshift, supports the VACUUM command, which reclaims storage space after UPDATE and DELETE commands are run on a table. Greenplum also allows you to add the ANALYZE option to run both statements with a single command.

The following code is the Greenplum command:

VACUUM ANALYZE table;

This is not very common, but you’ll see this from time to time. If you’re just inserting data into a table, there is no need to run VACUUM, but for ease of use, sometimes developers will use VACUUM ANALYZE.

The following are the Amazon Redshift commands:

VACUUM table;
ANALYZE table;

Amazon Redshift doesn’t support adding ANALYZE to the VACUUM command, so instead, this needs to be two different statements. Also note that Amazon Redshift performs VACUUM and ANALYZE automatically for you so in most cases, you can remove these commands from your scripts entirely.

DISTINCT ON query

Greenplum supports an unusual shortcut for eliminating duplicates in a table. This feature keeps the first row for each set of rows based on the order of the data being fetched. It’s easiest to understand by looking at an example:

CREATE TABLE customer 
(customer_name varchar(100) not null, 
customer_address varchar(1000) not null, 
lastupdate timestamp not null);

INSERT INTO customer VALUES
('ACME', '123 Main St', '2022-01-01'), 
('ACME', '456 Market St', '2022-05-01'), 
('ACME', '789 Broadway', '2022-08-01');

SELECT DISTINCT ON (customer_name) customer_name, customer_address 
FROM customer 
ORDER BY customer_name, lastupdate DESC;

We get the following results:

customer_name | customer_address 
---------------+------------------
 ACME          | 789 Broadway

The solution for running this in Amazon Redshift is to use the ANSI standard row_number() analytical function, as shown in the following code:

SELECT sub.customer_name, sub.customer_address 
FROM (SELECT customer_name, customer_address, row_number() over (partition by customer_name ORDER BY lastupdate DESC) AS row_number FROM customer) AS sub 
WHERE sub.row_number = 1;

Clean up

The examples in this post create tables in Greenplum. To remove these example tables, run the following commands:

DROP TABLE IF EXISTS public.mytable;
DROP TABLE IF EXISTS public.mytable2;
DROP TABLE IF EXISTS public.sales;
DROP TABLE IF EXISTS public.sales_new;
DROP TABLE IF EXISTS public.sales_items;
DROP TABLE IF EXISTS public.customer;

Conclusion

In this post, we covered some of the edge cases when migrating Greenplum to Amazon Redshift and how to handle these challenges, including easy virtual partitioning, edge cases for numeric and character fields, and arrays. This is not an exhaustive list of migrating Greenplum to Amazon Redshift, but this series should help you navigate modernizing your data platform by moving to Amazon Redshift.

For additional details, see the Amazon Redshift Getting Started Guide and the AWS SCT User Guide.


About the Authors

Jon Roberts is a Sr. Analytics Specialist based out of Nashville, specializing in Amazon Redshift. He has over 27 years of experience working in relational databases. In his spare time, he runs.

Nelly Susanto is a Senior Database Migration Specialist of AWS Database Migration Accelerator. She has over 10 years of technical experience focusing on migrating and replicating databases along with data warehouse workloads. She is passionate about helping customers in their cloud journey.

Suresh Patnam is a Principal BDM – GTM AI/ML Leader at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible by leveraging Data & AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Configuration driven dynamic multi-account CI/CD solution on AWS

Post Syndicated from Anshul Saxena original https://aws.amazon.com/blogs/devops/configuration-driven-dynamic-multi-account-ci-cd-solution-on-aws/

Many organizations require durable automated code delivery for their applications. They leverage multi-account continuous integration/continuous deployment (CI/CD) pipelines to deploy code and run automated tests in multiple environments before deploying to Production. In cases where the testing strategy is release specific, you must update the pipeline before every release. Traditional pipeline stages are predefined and static in nature, and once the pipeline stages are defined it’s hard to update them. In this post, we present a configuration driven dynamic CI/CD solution per repository. The pipeline state is maintained and governed by configurations stored in Amazon DynamoDB. This gives you the advantage of automatically customizing the pipeline for every release based on the testing requirements.

By following this post, you will set up a dynamic multi-account CI/CD solution. Your pipeline will deploy and test a sample pet store API application. Refer to Automating your API testing with AWS CodeBuild, AWS CodePipeline, and Postman for more details on this application. New code deployments will be delivered with custom pipeline stages based on the pipeline configuration that you create. This solution uses services such as AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, Amazon DynamoDB, AWS Lambda, and AWS Step Functions.

Solution overview

The following diagram illustrates the solution architecture:

The image represents the solution workflow, highlighting the integration of the AWS components involved.

Figure 1: Architecture Diagram

  1. Users insert/update/delete entry in the DynamoDB table.
  2. The Step Function Trigger Lambda is invoked on all modifications.
  3. The Step Function Trigger Lambda evaluates the incoming event and does the following:
    1. On insert and update, triggers the Step Function.
    2. On delete, finds the appropriate CloudFormation stack and deletes it.
  4. Steps in the Step Function are as follows:
    1. Collect Information (Pass State) – Filters the relevant information from the event, such as repositoryName and referenceName.
    2. Get Mapping Information (Backed by CodeCommit event filter Lambda) – Retrieves the mapping information from the Pipeline config stored in the DynamoDB.
    3. Deployment Configuration Exist? (Choice State) – If the StatusCode == 200, then the DynamoDB entry is found, and Initiate CloudFormation Stack step is invoked, or else StepFunction exits with Successful.
    4. Initiate CloudFormation Stack (Backed by stack create Lambda) – Constructs the CloudFormation parameters and creates/updates the dynamic pipeline based on the configuration stored in the DynamoDB via CloudFormation.

Code deliverables

The code deliverables include the following:

  1. AWS CDK app – The AWS CDK app contains the code for all the Lambdas, Step Functions, and CloudFormation templates.
  2. sample-application-repo – This directory contains the sample application repository used for deployment.
  3. automated-tests-repo– This directory contains the sample automated tests repository for testing the sample repo.

Deploying the CI/CD solution

  1. Clone this repository to your local machine.
  2. Follow the README to deploy the solution to your main CI/CD account. Upon successful deployment, the following resources should be created in the CI/CD account:
    1. A DynamoDB table
    2. Step Function
    3. Lambda Functions
  3. Navigate to the Amazon Simple Storage Service (Amazon S3) console in your main CI/CD account and search for a bucket with the name: cloudformation-template-bucket-<AWS_ACCOUNT_ID>. You should see two CloudFormation templates (templates/codepipeline.yaml and templates/childaccount.yaml) uploaded to this bucket.
  4. Run the childaccount.yaml in every target CI/CD account (Alpha, Beta, Gamma, and Prod) by going to the CloudFormation Console. Provide the main CI/CD account number as the “CentralAwsAccountId” parameter, and execute.
  5. Upon successful creation of Stack, two roles will be created in the Child Accounts:
    1. ChildAccountFormationRole
    2. ChildAccountDeployerRole

Pipeline configuration

Make an entry into devops-pipeline-table-info for the Repository name and branch combination. A sample entry can be found in sample-entry.json.

The pipeline is highly configurable, and everything can be configured through the DynamoDB entry.

The following are the top-level keys:

RepoName: Name of the repository for which AWS CodePipeline is configured.
RepoTag: Name of the branch used in CodePipeline.
BuildImage: Build image used for application AWS CodeBuild project.
BuildSpecFile: Buildspec file used in the application CodeBuild project.
DeploymentConfigurations: This key holds the deployment configurations for the pipeline. Under this key are the environment specific configurations. In our case, we’ve named our environments Alpha, Beta, Gamma, and Prod. You can configure to any name you like, but make sure that the entries in json are the same as in the codepipeline.yaml CloudFormation template. This is because there is a 1:1 mapping between them. Sub-level keys under DeploymentConfigurations are as follows:

  • EnvironmentName. This is the top-level key for environment specific configuration. In our case, it’s Alpha, Beta, Gamma, and Prod. Sub level keys under this are:
    • <Env>AwsAccountId: AWS account ID of the target environment.
    • Deploy<Env>: A key specifying whether or not the artifact should be deployed to this environment. Based on its value, the CodePipeline will have a deployment stage to this environment.
    • ManualApproval<Env>: Key representing whether or not manual approval is required before deployment. Enter your email or set to false.
    • Tests: Once again, this is a top-level key with sub-level keys. This key holds the test related information to be run on specific environments. Each test based on whether or not it will be run will add an additional step to the CodePipeline. The tests’ related information is also configurable with the ability to specify the test repository, branch name, buildspec file, and build image for testing the CodeBuild project.

Execute

  1. Make an entry into the devops-pipeline-table-info DynamoDB table in the main CI/CD account. A sample entry can be found in sample-entry.json. Make sure to replace the configuration values with appropriate values for your environment. An explanation of the values can be found in the Pipeline Configuration section above.
  2. After the entry is made in the DynamoDB table, you should see a CloudFormation stack being created. This CloudFormation stack will deploy the CodePipeline in the main CI/CD account by reading and using the entry in the DynamoDB table.

Customize the solution for different combinations such as deploying to an environment while skipping for others by updating the pipeline configurations stored in the devops-pipeline-table-info DynamoDB table. The following is the pipeline configured for the sample-application repository’s main branch.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

The image represents the dynamic CI/CD pipeline deployed in your account.

Figure 2: Dynamic Multi-Account CI/CD Pipeline

Clean up your dynamic multi-account CI/CD solution and related resources

To avoid ongoing charges for the resources that you created following this post, you should delete the following:

  1. The pipeline configuration stored in the DynamoDB
  2. The CloudFormation stacks deployed in the target CI/CD accounts
  3. The AWS CDK app deployed in the main CI/CD account
  4. Empty and delete the retained S3 buckets.

Conclusion

This configuration-driven CI/CD solution provides the ability to dynamically create and configure your pipelines in DynamoDB. IDEMIA, a global leader in identity technologies, adopted this approach for deploying their microservices based application across environments. This solution created by AWS Professional Services allowed them to dynamically create and configure their pipelines per repository per release. As Kunal Bajaj, Tech Lead of IDEMIA, states, “We worked with AWS pro-serve team to create a dynamic CI/CD solution using lambdas, step functions, SQS, and other native AWS services to conduct cross-account deployments to our different environments while providing us the flexibility to add tests and approvals as needed by the business.”

About the authors:

Anshul Saxena

Anshul is a Cloud Application Architect at AWS Professional Services and works with customers helping them in their cloud adoption journey. His expertise lies in DevOps, serverless architectures, and architecting and implementing cloud native solutions aligning with best practices.

Libin Roy

Libin is a Cloud Infrastructure Architect at AWS Professional Services. He enjoys working with customers to design and build cloud native solutions to accelerate their cloud journey. Outside of work, he enjoys traveling, cooking, playing sports and weight training.

How to secure your SaaS tenant data in DynamoDB with ABAC and client-side encryption

Post Syndicated from Jani Muuriaisniemi original https://aws.amazon.com/blogs/security/how-to-secure-your-saas-tenant-data-in-dynamodb-with-abac-and-client-side-encryption/

If you’re a SaaS vendor, you may need to store and process personal and sensitive data for large numbers of customers across different geographies. When processing sensitive data at scale, you have an increased responsibility to secure this data end-to-end. Client-side encryption of data, such as your customers’ contact information, provides an additional mechanism that can help you protect your customers and earn their trust.

In this blog post, we show how to implement client-side encryption of your SaaS application’s tenant data in Amazon DynamoDB with the Amazon DynamoDB Encryption Client. This is accomplished by leveraging AWS Identity and Access Management (IAM) together with AWS Key Management Service (AWS KMS) for a more secure and cost-effective isolation of the client-side encrypted data in DynamoDB, both at run-time and at rest.

Encrypting data in Amazon DynamoDB

Amazon DynamoDB supports data encryption at rest using encryption keys stored in AWS KMS. This functionality helps reduce operational burden and complexity involved in protecting sensitive data. In this post, you’ll learn about the benefits of adding client-side encryption to achieve end-to-end encryption in transit and at rest for your data, from its source to storage in DynamoDB. Client-side encryption helps ensure that your plaintext data isn’t available to any third party, including AWS.

You can use the Amazon DynamoDB Encryption Client to implement client-side encryption with DynamoDB. In the solution in this post, client-side encryption refers to the cryptographic operations that are performed on the application-side in the application’s Lambda function, before the data is sent to or retrieved from DynamoDB. The solution in this post uses the DynamoDB Encryption Client with the Direct KMS Materials Provider so that your data is encrypted by using AWS KMS. However, the underlying concept of the solution is not limited to the use of the DynamoDB Encryption Client, you can apply it to any client-side use of AWS KMS, for example using the AWS Encryption SDK.

For detailed information about using the DynamoDB Encryption Client, see the blog post How to encrypt and sign DynamoDB data in your application. This is a great place to start if you are not yet familiar with DynamoDB Encryption Client. If you are unsure about whether you should use client-side encryption, see Client-side and server-side encryption in the Amazon DynamoDB Encryption Client Developer Guide to help you with the decision.

AWS KMS encryption context

AWS KMS gives you the ability to add an additional layer of authentication for your AWS KMS API decrypt operations by using encryption context. The encryption context is one or more key-value pairs of additional data that you want associated with AWS KMS protected information.

Encryption context helps you defend against the risks of ciphertexts being tampered with, modified, or replaced — whether intentionally or unintentionally. Encryption context helps defend against both an unauthorized user replacing one ciphertext with another, as well as problems like operational events. To use encryption context, you specify associated key-value pairs on encrypt. You must provide the exact same key-value pairs in the encryption context on decrypt, or the operation will fail. Encryption context is not secret, and is not an access-control mechanism. The encryption context is a means of authenticating the data, not the caller.

The Direct KMS Materials Provider used in this blog post transparently generates a unique data key by using AWS KMS for each item stored in the DynamoDB table. It automatically sets the item’s partition key and sort key (if any) as AWS KMS encryption context key-value pairs.

The solution in this blog post relies on the partition key of each table item being defined in the encryption context. If you encrypt data with your own implementation, make sure to add your tenant ID to the encryption context in all your AWS KMS API calls.

For more information about the concept of AWS KMS encryption context, see the blog post How to Protect the Integrity of Your Encrypted Data by Using AWS Key Management Service and EncryptionContext. You can also see another example in Exercise 3 of the Busy Engineer’s Document Bucket Workshop.

Attribute-based access control for AWS

Attribute-based access control (ABAC) is an authorization strategy that defines permissions based on attributes. In AWS, these attributes are called tags. In the solution in this post, ABAC helps you create tenant-isolated access policies for your application, without the need to provision tenant specific AWS IAM roles.

If you are new to ABAC, or need a refresher on the concepts and the different isolation methods, see the blog post How to implement SaaS tenant isolation with ABAC and AWS IAM.

Solution overview

If you are a SaaS vendor expecting large numbers of tenants, it is important that your underlying architecture can cost effectively scale with minimal complexity to support the required number of tenants, without compromising on security. One way to meet these criteria is to store your tenant data in a single pooled DynamoDB table, and to encrypt the data using a single AWS KMS key.

Using a single shared KMS key to read and write encrypted data in DynamoDB for multiple tenants reduces your per-tenant costs. This may be especially relevant to manage your costs if you have users on your organization’s free tier, with no direct revenue to offset your costs.

When you use shared resources such as a single pooled DynamoDB table encrypted by using a single KMS key, you need a mechanism to help prevent cross-tenant access to the sensitive data. This is where you can use ABAC for AWS. By using ABAC, you can build an application with strong tenant isolation capabilities, while still using shared and pooled underlying resources for storing your sensitive tenant data.

You can find the solution described in this blog post in the aws-dynamodb-encrypt-with-abac GitHub repository. This solution uses ABAC combined with KMS encryption context to provide isolation of tenant data, both at rest and at run time. By using a single KMS key, the application encrypts tenant data on the client-side, and stores it in a pooled DynamoDB table, which is partitioned by a tenant ID.

Solution Architecture

Figure 1: Components of solution architecture

Figure 1: Components of solution architecture

The presented solution implements an API with a single AWS Lambda function behind an Amazon API Gateway, and implements processing for two types of requests:

  1. GET request: fetch any key-value pairs stored in the tenant data store for the given tenant ID.
  2. POST request: store the provided key-value pairs in the tenant data store for the given tenant ID, overwriting any existing data for the same tenant ID.

The application is written in Python, it uses AWS Lambda Powertools for Python, and you deploy it by using the AWS CDK.

It also uses the DynamoDB Encryption Client for Python, which includes several helper classes that mirror the AWS SDK for Python (Boto3) classes for DynamoDB. This solution uses the EncryptedResource helper class which provides Boto3 compatible get_item and put_item methods. The helper class is used together with the KMS Materials Provider to handle encryption and decryption with AWS KMS transparently for the application.

Note: This example solution provides no authentication of the caller identity. See chapter “Considerations for authentication and authorization” for further guidance.

How it works

Figure 2: Detailed architecture for storing new or updated tenant data

Figure 2: Detailed architecture for storing new or updated tenant data

As requests are made into the application’s API, they are routed by API Gateway to the application’s Lambda function (1). The Lambda function begins to run with the IAM permissions that its IAM execution role (DefaultExecutionRole) has been granted. These permissions do not grant any access to the DynamoDB table or the KMS key. In order to access these resources, the Lambda function first needs to assume the ResourceAccessRole, which does have the necessary permissions. To implement ABAC more securely in this use case, it is important that the application maintains clear separation of IAM permissions between the assumed ResourceAccessRole and the DefaultExecutionRole.

As the application assumes the ResourceAccessRole using the AssumeRole API call (2), it also sets a TenantID session tag. Session tags are key-value pairs that can be passed when you assume an IAM role in AWS Simple Token Service (AWS STS), and are a fundamental core building block of ABAC on AWS. When the session credentials (3) are used to make a subsequent request, the request context includes the aws:PrincipalTag context key, which can be used to access the session’s tags. The chapter “The ResourceAccessRole policy” describes how the aws:PrincipalTag context key is used in IAM policy condition statements to implement ABAC for this solution. Note that for demonstration purposes, this solution receives the value for the TenantID tag directly from the request URL, and it is not authenticated.

The trust policy of the ResourceAccessRole defines the principals that are allowed to assume the role, and to tag the assumed role session. Make sure to limit the principals to the least needed for your application to function. In this solution, the application Lambda function is the only trusted principal defined in the trust policy.

Next, the Lambda function prepares to encrypt or decrypt the data (4). To do so, it uses the DynamoDB Encryption Client. The KMS Materials Provider and the EncryptedResource helper class are both initialized with sessions by using the temporary credentials from the AssumeRole API call. This allows the Lambda function to access the KMS key and DynamoDB table resources, with access restricted to operations on data belonging only to the specific tenant ID.

Finally, using the EncryptedResource helper class provided by the DynamoDB Encryption Library, the data is written to and read from the DynamoDB table (5).

Considerations for authentication and authorization

The solution in this blog post intentionally does not implement authentication or authorization of the client requests. Instead, the requested tenant ID from the request URL is passed as the tenant identity. Your own applications should always authenticate and authorize tenant requests. There are multiple ways you can achieve this.

Modern web applications commonly use OpenID Connect (OIDC) for authentication, and OAuth for authorization. JSON Web Tokens (JWTs) can be used to pass the resulting authorization data from client to the application. You can validate a JWT when using AWS API Gateway with one of the following methods:

  1. When using a REST or a HTTP API, you can use a Lambda authorizer
  2. When using a HTTP API, you can use a JWT authorizer
  3. You can validate the token directly in your application code

If you write your own authorizer code, you can pick a popular open source library or you can choose the AWS provided open source library. To learn more about using a JWT authorizer, see the blog post How to secure API Gateway HTTP endpoints with JWT authorizer.

Regardless of the chosen method, you must be able to map a suitable claim from the user’s JWT, such as the subject, to the tenant ID, so that it can be used as the session tag in this solution.

The ResourceAccessRole policy

A critical part of the correct operation of ABAC in this solution is with the definition of the IAM access policy for the ResourceAccessRole. In the following policy, be sure to replace <region>, <account-id>, <table-name>, and <key-id> with your own values.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:<region>:<account-id>:table/<table-name>",
           ],
            "Condition": {
                "ForAllValues:StringEquals": {
                    "dynamodb:LeadingKeys": [
                        "${aws:PrincipalTag/TenantID}"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey",
            ],
            "Resource": "arn:aws:kms:<region>:<account-id>:key/<key-id>",
            "Condition": {
                "StringEquals": {
                    "kms:EncryptionContext:tenant_id": "${aws:PrincipalTag/TenantID}"
                }
            }
        }
    ]
}

The policy defines two access statements, both of which apply separate ABAC conditions:

  1. The first statement grants access to the DynamoDB table with the condition that the partition key of the item matches the TenantID session tag in the caller’s session.
  2. The second statement grants access to the KMS key with the condition that one of the key-value pairs in the encryption context of the API call has a key called tenant_id with a value that matches the TenantID session tag in the caller’s session.

Warning: Do not use a ForAnyValue or ForAllValues set operator with the kms:EncryptionContext single-valued condition key. These set operators can create a policy condition that does not require values you intend to require, and allows values you intend to forbid.

Deploying and testing the solution

Prerequisites

To deploy and test the solution, you need the following:

Deploying the solution

After you have the prerequisites installed, run the following steps in a command line environment to deploy the solution. Make sure that your AWS CLI is configured with your AWS account credentials. Note that standard AWS service charges apply to this solution. For more information about pricing, see the AWS Pricing page.

To deploy the solution into your AWS account

  1. Use the following command to download the source code:
    git clone https://github.com/aws-samples/aws-dynamodb-encrypt-with-abac
    cd aws-dynamodb-encrypt-with-abac

  2. (Optional) You will need an AWS CDK version compatible with the application (2.37.0) to deploy. The simplest way is to install a local copy with npm, but you can also use a globally installed version if you already have one. To install locally, use the following command to use npm to install the AWS CDK:
    npm install [email protected]

  3. Use the following commands to initialize a Python virtual environment:
    python3 -m venv demoenv
    source demoenv/bin/activate
    python3 -m pip install -r requirements.txt

  4. (Optional) If you have not used AWS CDK with this account and Region before, you first need to bootstrap the environment:
    npx cdk bootstrap

  5. Use the following command to deploy the application with the AWS CDK:
    npx cdk deploy

  6. Make note of the API endpoint URL https://<api url>/prod/ in the Outputs section of the CDK command. You will need this URL for the next steps.
    Outputs:
    DemoappStack.ApiEndpoint4F160690 = https://<api url>/prod/

Testing the solution with example API calls

With the application deployed, you can test the solution by making API calls against the API URL that you captured from the deployment output. You can start with a simple HTTP POST request to insert data for a tenant. The API expects a JSON string as the data to store, so make sure to post properly formatted JSON in the body of the request.

An example request using curl -command looks like:

curl https://<api url>/prod/tenant/<tenant-name> -X POST --data '{"email":"<[email protected]>"}'

You can then read the same data back with an HTTP GET request:

curl https://<api url>/prod/tenant/<tenant-name>

You can store and retrieve data for any number of tenants, and can store as many attributes as you like. Each time you store data for a tenant, any previously stored data is overwritten.

Additional considerations

A tenant ID is used as the DynamoDB table’s partition key in the example application in this solution. You can replace the tenant ID with another unique partition key, such as a product ID, as long as the ID is consistently used in the IAM access policy, the IAM session tag, and the KMS encryption context. In addition, while this solution does not use a sort key in the table, you can modify the application to support a sort key with only a few changes. For more information, see Working with tables and data in DynamoDB.

Clean up

To clean up the application resources that you deployed while testing the solution, in the solution’s home directory, run the command cdk destroy.

Then, if you no longer plan to deploy to this account and Region using AWS CDK, you can also use the AWS CloudFormation console to delete the bootstrap stack (CDKToolKit).

Conclusion

In this post, you learned a method for simple and cost-efficient client-side encryption for your tenant data. By using the DynamoDB Encryption Client, you were able to implement the encryption with less effort, all while using a standard Boto3 DynamoDB Table resource compatible interface.

Adding to the client-side encryption, you also learned how to apply attribute-based access control (ABAC) to your IAM access policies. You used ABAC for tenant isolation by applying conditions for both the DynamoDB table access, as well as access to the KMS key that is used for encryption of the tenant data in the DynamoDB table. By combining client-side encryption with ABAC, you have increased your data protection with multiple layers of security.

You can start experimenting today on your own by using the provided solution. If you have feedback about this post, submit comments in the Comments section below. If you have questions on the content, consider submitting them to AWS re:Post

Want more AWS Security news? Follow us on Twitter.

Jani Muuriaisniemi

Jani is a Principal Solutions Architect at Amazon Web Services based out of Helsinki, Finland. With more than 20 years of industry experience, he works as a trusted advisor with a broad range of customers across different industries and segments, helping the customers on their cloud journey.

How Hudl built a cost-optimized AWS Glue pipeline with Apache Hudi datasets

Post Syndicated from Indira Balakrishnan original https://aws.amazon.com/blogs/big-data/how-hudl-built-a-cost-optimized-aws-glue-pipeline-with-apache-hudi-datasets/

This is a guest blog post co-written with Addison Higley and Ramzi Yassine from Hudl.

Hudl Agile Sports Technologies, Inc. is a Lincoln, Nebraska based company that provides tools for coaches and athletes to review game footage and improve individual and team play. Its initial product line served college and professional American football teams. Today, the company provides video services to youth, amateur, and professional teams in American football as well as other sports, including soccer, basketball, volleyball, and lacrosse. It now serves 170,000 teams in 50 different sports around the world. Hudl’s overall goal is to capture and bring value to every moment in sports.

Hudl’s mission is to make every moment in sports count. Hudl does this by expanding access to more moments through video and data and putting those moments in context. Our goal is to increase access by different people and increase context with more data points for every customer we serve. Using data to generate analytics, Hudl is able to turn data into actionable insights, telling powerful stories with video and data.

To best serve our customers and provide the most powerful insights possible, we need to be able to compare large sets of data between different sources. For example, enriching our MongoDB and Amazon DocumentDB (with MongoDB compatibility) data with our application logging data leads to new insights. This requires resilient data pipelines.

In this post, we discuss how Hudl has iterated on one such data pipeline using AWS Glue to improve performance and scalability. We talk about the initial architecture of this pipeline, and some of the limitations associated with this approach. We also discuss how we iterated on that design using Apache Hudi to dramatically improve performance.

Problem statement

A data pipeline that ensures high-quality MongoDB and Amazon DocumentDB statistics data is available in our central data lake, and is a requirement for Hudl to be able to deliver sports analytics. It’s important to maintain the integrity of the data between MongoDB and Amazon DocumentDB transactional data with the data lake capturing changes in near-real time along with upserts to records in the data lake. Because Hudl statistics are backed by MongoDB and Amazon DocumentDB databases, in addition to a broad range of other data sources, it’s important that relevant MongoDB and Amazon DocumentDB data is available in a central data lake where we can run analytics queries to compare statistics data between sources.

Initial design

The following diagram demonstrates the architecture of our initial design.

Intial Ingestion Pipeline Design

Let’s discuss the key AWS services of this architecture:

  • AWS Data Migration Service (AWS DMS) allowed our team to move quickly in delivering this pipeline. AWS DMS gives our team a full snapshot of the data, and also offers ongoing change data capture (CDC). By combining these two datasets, we can ensure our pipeline delivers the latest data.
  • Amazon Simple Storage Service (Amazon S3) is the backbone of Hudl’s data lake because of its durability, scalability, and industry-leading performance.
  • AWS Glue allows us to run our Spark workloads in a serverless fashion, with minimal setup. We chose AWS Glue for its ease of use and speed of development. Additionally, features such as AWS Glue bookmarking simplified our file management logic.
  • Amazon Redshift offers petabyte-scale data warehousing. Amazon Redshift provides consistently fast performance, and easy integrations with our S3 data lake.

The data processing flow includes the following steps:

  1. Amazon DocumentDB holds the Hudl statistics data.
  2. AWS DMS gives us a full export of statistics data from Amazon DocumentDB, and ongoing changes in the same data.
  3. In the S3 Raw Zone, the data is stored in JSON format.
  4. An AWS Glue job merges the initial load of statistics data with the changed statistics data to give a snapshot of statistics data in JSON format for reference, eliminating duplicates.
  5. In the S3 Cleansed Zone, the JSON data is normalized and converted to Parquet format.
  6. AWS Glue uses a COPY command to insert Parquet data into Amazon Redshift consumption base tables.
  7. Amazon Redshift stores the final table for consumption.

The following is a sample code snippet from the AWS Glue job in the initial data pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)
   full_df = read_full_data()#Load entire dataset from S3 Cleansed Zone


cdc_df = read_cdc_data() # Read new CDC data which represents delta in the source MongoDB/DocumentDB


joined_df = full_df.join(cdc_df, '_id', 'full_outer') #Calculate final snapshot by joining the existing data with delta


result = joined_df.filter((joined_df.Op != 'D') | (joined_df.Op.isNull())) .select(coalesce(cdc_df._doc, full_df._doc).alias('_doc'))

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(result, gc) , connection_type = "s3", connection_options = {"path": output_path}, format = "parquet", transformation_ctx = "ctx4")

Challenges

Although this initial solution met our need for data quality, we felt there was room for improvement:

  • The pipeline was slow – The pipeline ran slowly (over 2 hours) because for each batch, the whole dataset was compared. Every record had to be compared, flattened, and converted to Parquet, even when only a few records were changed from the previous daily run.
  • The pipeline was expensive – As the data size grew daily, the job duration also grew significantly (especially in step 4). To mitigate the impact, we needed to allocate more AWS Glue DPUs (Data Processing Units) to scale the job, which led to higher cost.
  • The pipeline limited our ability to scale – Hudl’s data has a long history of rapid growth with increasing customers and sporting events. Given this trend, our pipeline needed to run as efficiently as possible to handle only changing datasets to have predictable performance.

New design

The following diagram illustrates our updated pipeline architecture.

Although the overall architecture looks roughly the same, the internal logic in AWS Glue was significantly changed, along with addition of Apache Hudi datasets.

In step 4, AWS Glue now interacts with Apache HUDI datasets in the S3 Cleansed Zone to upsert or delete changed records as identified by AWS DMS CDC. The AWS Glue to Apache Hudi connector helps convert JSON data to Parquet format and upserts into the Apache HUDI dataset. Retaining the full documents in our Apache HUDI dataset allows us to easily make schema changes to our final Amazon Redshift tables without needing to re-export data from our source systems.

The following is a sample code snippet from the new AWS Glue pipeline:

from awsglue.context import GlueContext 
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate() 
spark_context = spark.sparkContext 
gc = GlueContext(spark_context)

upsert_conf = {'className': 'org.apache.hudi', '
hoodie.datasource.hive_sync.use_jdbc': 'false', 
'hoodie.datasource.write.precombine.field': 'write_ts', 
'hoodie.datasource.write.recordkey.field': '_id', 
'hoodie.table.name': 'glue_table', 
'hoodie.consistency.check.enabled': 'true', 
'hoodie.datasource.hive_sync.database': 'glue_database', 'hoodie.datasource.hive_sync.table': 'glue_table', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.datasource.hive_sync.sync_as_datasource': 'false', 
'path': 's3://bucket/prefix/', 'hoodie.compact.inline': 'false', 'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.upsert.shuffle.parallelism': 200, 
'hoodie.datasource.write.operation': 'upsert', 
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
'hoodie.cleaner.commits.retained': 10 }

gc.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(cdc_upserts_df, gc, "cdc_upserts_df"), connection_type="marketplace.spark", connection_options=upsert_conf)

Results

With this new approach using Apache Hudi datasets with AWS Glue deployed after May 2022, the pipeline runtime was predictable and less expensive than the initial approach. Because we only handled new or modified records by eliminating the full outer join over the entire dataset, we saw an 80–90% reduction in runtime for this pipeline, thereby reducing costs by 80–90% compared to the initial approach. The following diagram illustrates our processing time before and after implementing the new pipeline.

Conclusion

With Apache Hudi’s open-source data management framework, we simplified incremental data processing in our AWS Glue data pipeline to manage data changes at the record level in our S3 data lake with CDC from Amazon DocumentDB.

We hope that this post will inspire your organization to build AWS Glue pipelines with Apache Hudi datasets that reduce cost and bring performance improvements using serverless technologies to achieve your business goals.


About the authors

Addison Higley is a Senior Data Engineer at Hudl. He manages over 20 data pipelines to help ensure data is available for analytics so Hudl can deliver insights to customers.

Ramzi Yassine is a Lead Data Engineer at Hudl. He leads the architecture, implementation of Hudl’s data pipelines and data applications, and ensures that our data empowers internal and external analytics.

Swagat Kulkarni is a Senior Solutions Architect at AWS and an AI/ML enthusiast. He is passionate about solving real-world problems for customers with cloud-native services and machine learning. Swagat has over 15 years of experience delivering several digital transformation initiatives for customers across multiple domains, including retail, travel and hospitality, and healthcare. Outside of work, Swagat enjoys travel, reading, and meditating.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

Create a Multi-Region Python Package Publishing Pipeline with AWS CDK and CodePipeline

Post Syndicated from Brian Smitches original https://aws.amazon.com/blogs/devops/create-a-multi-region-python-package-publishing-pipeline-with-aws-cdk-and-codepipeline/

Customers can author and store internal software packages in AWS by leveraging the AWS CodeSuite (AWS CodePipeline, AWS CodeBuild, AWS CodeCommit, and AWS CodeArtifact).  As of the publish date of this blog post, there is no native way to replicate your CodeArtifact Packages across regions. This blog addresses how a custom solution built with the AWS Cloud Development Kit and AWS CodePipeline can create a Multi-Region Python Package Publishing Pipeline.

Whether it’s for resiliency or performance improvement, many customers want to deploy their applications across multiple regions. When applications are dependent on custom software packages, the software packages should be replicated to multiple regions as well. This post will walk through how to deploy a custom package publishing pipeline in your own AWS Account. This pipeline connects a Python package source code repository to build and publish pip packages to CodeArtifact Repositories spanning three regions (the primary and two replica regions). While this sample CDK Application is built specifically for pip packages, the underlying architecture can be reused for different software package formats, such as npm, Maven, NuGet, etc.

Solution overview

The following figure demonstrates the solution workflow:

  1. A CodePipeline pipeline orchestrates the building and publishing of the software package
    1. This pipeline is triggered by commits on the main branch of the CodeCommit repository
    2. A CodeBuild job builds the pip packages using twine to be distributed
    3. The publish stage (third column) uses three parallel CodeBuild jobs to publish the distribution package to the two CodeArtifact repositories in separate regions
  1. The first CodeArtifact Repository stores the package contents in the primary region.
  2. The second and third CodeArtifact Repository act as replicas and store the package contents in other regions.
Figure 1. A figure showing the architecture diagram

Figure 1.  Architecture diagram

All of these resources are defined in a single AWS CDK Application. The resources are defined in CDK Stacks that are deployed as AWS CloudFormation Stacks. AWS CDK can deploy the different stacks across separate regions.

Prerequisites

Before getting started, you will need the following:

  1. An AWS account
  2. An instance of the AWS Cloud9 IDE or an alternative local compute environment, such as your personal computer
  3. The following installed on your compute environment:
    1. AWS CDK
    2. AWS Command Line Interface (AWS CLI)
    3. npm
  1. The AWS Accounts must be bootstrapped for CDK in the necessary regions. The default configuration uses us-east-1, us-east-2 and us-west-2  as these three regions support CodeArtifact.

A new AWS Cloud9 IDE is recommended for this tutorial to isolate these actions in this post from your normal compute environment. See the Cloud9 Documentation for Creating an Environment.

Deploy the Python Package Publishing Pipeline into your AWS Account with the CDK

The source code can be found in this GitHub Repository.

  1. Fork the GitHub Repo into your account. This way you can experiment with changes as necessary to fit your workload.
  2. In your local compute environment, clone the GitHub Repository and cd into the project directory:
git clone [email protected]:<YOUR_GITHUB_USERNAME>/multi-region-
python-package-publishing-pipeline.git && cd multi-region-
python-package-publishing-pipeline
  1. Install the necessary node packages:
npm i
  1. (Optional) Override the default configurations for the CodeArtifact domainName, repositoryName, primaryRegion, and replicaRegions.
    1. navigate to ./bin/multiregion_package_publishing.ts and update the relevant fields.
    2. From the project’s root directory (multi-region-python-package-publishing-pipeline), deploy the AWS CDK application. This step may take 5-10 minutes.
cdk deploy --all
  1. When prompted “Do you wish to deploy these changes (y/n)?”, Enter y.

Viewing the deployed CloudFormation stacks

After the deployment of the AWS CDK application completes, you can view the deployed AWS CDK Stacks via CloudFormation. From the AWS Console, search “CloudFormation’ in the search bar and navigate to the service dashboard. In the primary region (us-east-1(N. Virginia)) you should see two stacks: CodeArtifactPrimaryStack-<region> and PackagePublishingPipelineStack.

Screenshot showing the CloudFormation Stacks in the primary region

Figure 2. Screenshot showing the CloudFormation Stacks in the primary region

Switch regions to one of the secondary regions us-west-2 (Oregon) or us-east-2 (Ohio) to see the remaining stacks named CodeArtifactReplicaStack-<region>. These correspond to the three AWS CDK Stacks from the architecture diagram.

Screenshot showing the CloudFormation stacks in a separate region

Figure 3. Screenshot showing the CloudFormation stacks in a separate region

Viewing the CodePipeline Package Publishing Pipeline

From the Console, select the primary region (us-east-1) and navigate to CodePipeline by utilizing the search bar. Select the Pipeline titled packagePipeline and inspect the state of the pipeline. This pipeline triggers after every commit from the CodeCommit repository named PackageSourceCode. If the pipeline is still in process, then wait a few minutes, as this pipeline can take approximately 7–8 minutes to complete all three stages (Source, Build, and Publish). Once it’s complete, the pipeline should reflect the following screenshot:

A screenshot showing the CodePipeline flow

Figure 4. A screenshot showing the CodePipeline flow

Viewing the Published Package in the CodeArtifact Repository

To view the published artifacts, go to the primary or secondary region and navigate to the CodeArtifact dashboard by utilizing the search bar in the Console. You’ll see a repository named package-artifact-repo. Select the repository and you’ll see the sample pip package named mypippackage inside the repository. This package is defined by the source code in the CodeCommit repository named PackageSourceCode in the primary region (us-east-1).

Screenshot of the package repository

Figure 5. Screenshot of the package repository

Create a new package version in CodeCommit and monitor the pipeline release

Navigate to your CodeCommit’s PackageSourceCode (us-east-1 CodeCommit > Repositories > PackageSourceCode. Open the setup.py file and select the Edit button. Make a simple modification, change the version = '1.0.0' to version = '1.1.0' and commit the changes to the Main branch.

A screenshot of the source package's code repository in CodeCommit

Figure 6. A screenshot of the source package’s code repository in CodeCommit

Now navigate back to CodePipeline and watch as the pipeline performs the release automatically. When the pipeline finishes, this new package version will live in each of the three CodeArtifact Repositories.

Install the custom pip package to your local Python Environment

For your development team to connect to this CodeArtifact Repository to download repositories, you must configure the pip tool to look in this repository. From your Cloud9 IDE (or local development environment), let’s test the installation of this package for Python3:

  1. Copy the connection instructions for the pip tool. Navigate to the CodeArtifact repository of your choice and select View connection instructions
    1. Select Copy to copy the snippet to your clipboard
Screenshot showing directions to connect to a code artifact repository

Figure 7. Screenshot showing directions to connect to a code artifact repository

  1. Paste the command from your clipboard
  2. Run pip install mypippackage==1.0.0
Screenshot showing CodeArtifact login

Figure 8. Screenshot showing CodeArtifact login

  1. Test the package works as expected by importing the modules
  2. Start the Python REPL by running python3 in the terminal
Screenshot of the package being imported

Figure 9. Screenshot of the package being imported

Clean up

Destroy all of the AWS CDK Stacks by running cdk destroy --all from the root AWS CDK application directory.

Conclusion

In this post, we walked through how to deploy a CodePipeline pipeline to automate the publishing of Python packages to multiple CodeArtifact repositories in separate regions. Leveraging the AWS CDK simplifies the maintenance and configuration of this multi-region solution by using Infrastructure as Code and predefined Constructs. If you would like to customize this solution to better fit your needs, please read more about the AWS CDK and AWS Developer Tools. Some links we suggest include the CodeArtifact User Guide (with sections covering npm, Python, Maven, and NuGet), the CDK API Reference, CDK Pipelines, and the CodePipeline User Guide.

About the authors:

Andrew Chen

Andrew Chen is a Solutions Architect with an interest in Data Analytics, Machine Learning, and DevOps. Andrew has previous experience in management consulting in which he worked as a technical architect for various cloud migration projects. In his free time, Andrew enjoys fishing, hiking, kayaking, and keeping up with financial markets.

Brian Smitches

Brian Smitches is a Solutions Architect with an interest in Infrastructure as Code and the AWS Cloud Development Kit. Brian currently supports Federal SMB Partners and has previous experience with Full Stack Application Development. In his personal time, Brian enjoys skiing, water sports, and traveling with friends and family.

Ingest streaming data to Apache Hudi tables using AWS Glue and Apache Hudi DeltaStreamer

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-to-apache-hudi-tables-using-aws-glue-and-apache-hudi-deltastreamer/

In today’s world with technology modernization, the need for near-real-time streaming use cases has increased exponentially. Many customers are continuously consuming data from different sources, including databases, applications, IoT devices, and sensors. Organizations may need to ingest that streaming data into data lakes built on Amazon Simple Storage Service (Amazon S3). You may also need to achieve analytics and machine learning (ML) use cases in near-real time. To ensure consistent results in those near-real-time streaming use cases, incremental data ingestion and atomicity, consistency, isolation, and durability (ACID) properties on data lakes have been a common ask.

To address such use cases, one approach is to use Apache Hudi and its DeltaStreamer utility. Apache Hudi is an open-source data management framework designed for data lakes. It simplifies incremental data processing by enabling ACID transactions and record-level inserts, updates, and deletes of streaming ingestion on data lakes built on top of Amazon S3. Hudi is integrated with well-known open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Presto, and Trino, as well as with various AWS analytics services like AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift. The DeltaStreamer utility provides an easy way to ingest streaming data from sources like Apache Kafka into your data lake.

This post describes how to run the DeltaStreamer utility on AWS Glue to read streaming data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and ingest the data into S3 data lakes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. With AWS Glue, you can create Spark, Spark Streaming, and Python shell jobs to extract, transform, and load (ETL) data. You can create AWS Glue Spark streaming ETL jobs using either Scala or PySpark that run continuously, consuming data from Amazon MSK, Apache Kafka, and Amazon Kinesis Data Streams and writing it to your target.

Solution overview

To demonstrate the DeltaStreamer utility, we use fictional product data that represents product inventory including product name, category, quantity, and last updated timestamp. Let’s assume we stream the data from data sources to an MSK topic. Now we want to ingest this data coming from the MSK topic into Amazon S3 so that we can run Athena queries to analyze business trends in near-real time.

The following diagram provides the overall architecture of the solution described in this post.

To simulate application traffic, we use Amazon Elastic Compute Cloud (Amazon EC2) to send sample data to an MSK topic. Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kakfa to process streaming data. To consume the streaming data from Amazon MSK, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector 0.10.1 for AWS Glue 3.0, with the DeltaStreamer utility to write the ingested data to Amazon S3. The Apache Hudi Connector 0.9.0 for AWS Glue 3.0 also supports the DeltaStreamer utility.

As the data is being ingested, the AWS Glue streaming job writes the data into the Amazon S3 base path. The data in Amazon S3 is cataloged using the AWS Glue Data Catalog. We then use Athena, which is an interactive query service, to query and analyze the data using standard SQL.

Prerequisites

We use an AWS CloudFormation template to provision some resources for our solution. The template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to ingest data to the MSK cluster running in a private subnet. Make sure you have a key in the AWS Region where you deploy the template. If you don’t have one, you can create a new key pair.

Create the Apache Hudi connection

To add the Apache Hudi Connector for AWS Glue, complete the following steps:

  1. On the AWS Glue Studio console, choose Connectors.
  2. Choose Go to AWS Marketplace.
  3. Search for and choose Apache Hudi Connector for AWS Glue.
  4. Choose Continue to Subscribe.
  5. Review the terms and conditions, then choose Accept Terms.

    After you accept the terms, it takes some time to process the request.
    When the subscription is complete, you see the Effective date populated next to the product.
  6. Choose Continue to Configuration.
  7. For Fulfillment option, choose Glue 3.0.
  8. For Software version, choose 0.10.1.
  9. Choose Continue to Launch.
  10. Choose Usage instructions, and then choose Activate the Glue connector from AWS Glue Studio.

    You’re redirected to AWS Glue Studio.
  11. For Name, enter Hudi-Glue-Connector.
  12. Choose Create connection and activate connector.

A message appears that the connection was successfully created. Verify that the connection is visible on the AWS Glue Studio console.

Launch the CloudFormation stack

For this post, we provide a CloudFormation template to create the following resources:

  • VPC, subnets, security groups, and VPC endpoints
  • AWS Identity and Access Management (IAM) roles and policies with required permissions
  • An EC2 instance running in a public subnet within the VPC with Kafka 2.12 installed and with the source data initial load and source data incremental load JSON files
  • An Amazon MSK server running in a private subnet within the VPC
  • An AWS Glue Streaming DeltaStreamer job to consume the incoming data from the Kafka topic and write it to Amazon S3
  • Two S3 buckets: one of the buckets stores code and config files, and other is the target for the AWS Glue streaming DeltaStreamer job

To create the resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-deltastreamer-glue-blog.
  3. For ClientIPCIDR, enter the IP address of your client that you use to connect to the EC2 instance.
  4. For HudiConnectionName, enter the AWS Glue connection you created earlier (Hudi-Glue-Connector).
  5. For KeyName, choose the name of the EC2 key pair that you created as a prerequisite.
  6. For VpcCIDR, leave as is.
  7. Choose Next.
  8. Choose Next.
  9. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the following information:

  • HudiDeltastreamerGlueJob – The AWS Glue streaming job name
  • MSKCluster – The MSK cluster ARN
  • PublicIPOfEC2InstanceForTunnel – The public IP of the EC2 instance for tunnel
  • TargetS3Bucket – The S3 bucket name

Create a topic in the MSK cluster

Next, SSH to Amazon EC2 using the key pair you created and run the following commands:

  1. SSH to the EC2 instance as ec2-user:
    ssh -i <KeyName> ec2-user@<PublicIPOfEC2InstanceForTunnel>

    You can get the KeyName value on the Parameters tab and the public IP of the EC2 instance for tunnel on the Outputs tab of the CloudFormation stack.

  2. For the next command, retrieve the bootstrap server endpoint of the MSK cluster by navigating to msk-source-cluster on the Amazon MSK console and choosing View client information.
  3. Run the following command to create the topic in the MSK cluster hudi-deltastream-demo:
    ./kafka_2.12-2.6.2/bin/kafka-topics.sh --create \
    --topic hudi-deltastream-demo \
    --bootstrap-server "<replace text with value under private endpoint on MSK>" \
    --partitions 1 \
    --replication-factor 2 \
    --command-config ./config_file.txt

  4. Ingest the initial data from the deltastreamer_initial_load.json file into the Kafka topic:
    ./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
    --broker-list "<replace text with value under private endpoint on MSK>" \
    --topic hudi-deltastream-demo \
    --producer.config ./config_file.txt < deltastreamer_initial_load.json

The following is the schema of a record ingested into the Kafka topic:

{
  "type":"record",
  "name":"products",
  "fields":[{
     "name": "id",
     "type": "int"
  }, {
     "name": "category",
     "type": "string"
  }, {
     "name": "ts",
     "type": "string"
  },{
     "name": "name",
     "type": "string"
  },{
     "name": "quantity",
     "type": "int"
  }
]}

The schema uses the following parameters:

  • id – The product ID
  • category – The product category
  • ts – The timestamp when the record was inserted or last updated
  • name – The product name
  • quantity – The available quantity of the product in the inventory

The following code gives an example of a record:

{
    "id": 1, 
    "category": "Apparel", 
    "ts": "2022-01-02 10:29:00", 
    "name": "ABC shirt", 
    "quantity": 4
}

Start the AWS Glue streaming job

To start the AWS Glue streaming job, complete the following steps:

  1. On the AWS Glue Studio console, find the job with the value for HudiDeltastreamerGlueJob.
  2. Choose the job to review the script and job details.
  3. On the Job details tab, replace the value of the --KAFKA_BOOTSTRAP_SERVERS key with the Amazon MSK bootstrap server’s private endpoint.
  4. Choose Save to save the job settings.
  5. Choose Run to start the job.

When the AWS Glue streaming job runs, the records from the MSK topic are consumed and written to the target S3 bucket created by AWS CloudFormation. To find the bucket name, check the stack’s Outputs tab for the TargetS3Bucket key value.

The data in Amazon S3 is stored in Parquet file format. In this example, the data written to Amazon S3 isn’t partitioned, but you can enable partitioning by specifying hoodie.datasource.write.partitionpath.field=<column_name> as the partition field and setting hoodie.datasource.write.hive_style_partitioning to True in the Hudi configuration property in the AWS Glue job script.

In this post, we write the data to a non-partitioned table, so we set the following two Hudi configurations:

  • hoodie.datasource.hive_sync.partition_extractor_class is set to org.apache.hudi.hive.NonPartitionedExtractor
  • hoodie.datasource.write.keygenerator.class is set to org.apache.hudi.keygen.NonpartitionedKeyGenerator

DeltaStreamer options and configuration

DeltaStreamer has multiple options available; the following are the options set in the AWS Glue streaming job used in this post:

  • continuous – DeltaStreamer runs in continuous mode running source-fetch.
  • enable-hive-sync – Enables table sync to the Apache Hive Metastore.
  • schemaprovider-class – Defines the class for the schema provider to attach schemas to the input and target table data.
  • source-class – Defines the source class to read data and has many built-in options.
  • source-ordering-field – The field used to break ties between records with the same key in input data. Defaults to ts (the Unix timestamp of record).
  • target-base-path – Defines the path for the target Hudi table.
  • table-type – Indicates the Hudi storage type to use. In this post, it’s set to COPY_ON_WRITE.

The following are some of the important DeltaStreamer configuration properties set in the AWS Glue streaming job:

# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc

# Kafka Source
hoodie.deltastreamer.source.kafka.topic=hudi-deltastream-demo

#Kafka props
bootstrap.servers=args("KAFKA_BOOTSTRAP_SERVERS")
auto.offset.reset=earliest
security.protocol=SSL

The configuration contains the following details:

  • hoodie.deltastreamer.schemaprovider.source.schema.file – The schema of the source record
  • hoodie.deltastreamer.schemaprovider.target.schema.file – The schema for the target record.
  • hoodie.deltastreamer.source.kafka.topic – The source MSK topic name
  • bootstap.servers – The Amazon MSK bootstrap server’s private endpoint
  • auto.offset.reset – The consumer’s behavior when there is no committed position or when an offset is out of range

Hudi configuration

The following are some of the important Hudi configuration options, which enable us to achieve in-place updates for the generated schema:

  • hoodie.datasource.write.recordkey.field – The record key field. This is the unique identifier of a record in Hudi.
  • hoodie.datasource.write.precombine.field – When two records have the same record key value, Apache Hudi picks the one with the largest value for the pre-combined field.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Possible values include UPSERT, INSERT, and BULK_INSERT.

AWS Glue Data Catalog table

The AWS Glue job creates a Hudi table in the Data Catalog mapped to the Hudi dataset on Amazon S3. Because the hoodie.datasource.hive_sync.table configuration parameter is set to product_table, the table is visible under the default database in the Data Catalog.

The following screenshot shows the Hudi table column names in the Data Catalog.

Query the data using Athena

With the Hudi datasets available in Amazon S3, you can query the data using Athena. Let’s use the following query:

SELECT * FROM "default"."product_table";

The following screenshot shows the query output. The table product_table has four records from the initial ingestion: two records for the category Apparel, one for Cosmetics, and one for Footwear.

Load incremental data into the Kafka topic

Now suppose that the store sold some quantity of apparel and footwear and added a new product to its inventory, as shown in the following code. The store sold two items of product ID 1 (Apparel) and one item of product ID 3 (Footwear). The store also added the Cosmetics category, with product ID 5.

{"id": 1, "category": "Apparel", "ts": "2022-01-02 10:45:00", "name": "ABC shirt", "quantity": 2}
{"id": 3, "category": "Footwear", "ts": "2022-01-02 10:50:00", "name": "DEF shoes", "quantity": 5}
{"id": 5, "category": "Cosmetics", "ts": "2022-01-02 10:55:00", "name": "JKL Lip gloss", "quantity": 7}

Let’s ingest the incremental data from the deltastreamer_incr_load.json file to the Kafka topic and query the data from Athena:

./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
--broker-list "<replace text with value under private endpoint on MSK>" \
--topic hudi-deltastream-demo \
--producer.config ./config_file.txt < deltastreamer_incr_load.json

Within a few seconds, you should see a new Parquet file created in the target S3 bucket under the product_table prefix. The following is the screenshot from Athena after the incremental data ingestion showing the latest updates.

Additional considerations

There are some hard-coded Hudi options in the AWS Glue Streaming job scripts. These options are set for the sample table that we created for this post, so update the options based on your workload.

Clean up

To avoid any incurring future charges, delete the CloudFormation stack, which deletes all the underlying resources created by this post, except for the product_table table created in the default database. Manually delete the product_table table under the default database from the Data Catalog.

Conclusion

In this post, we illustrated how you can add the Apache Hudi Connector for AWS Glue and perform streaming ingestion into an S3 data lake using Apache Hudi DeltaStreamer with AWS Glue. You can use the Apache Hudi Connector for AWS Glue to create a serverless streaming pipeline using AWS Glue streaming jobs with the DeltaStreamer utility to ingest data from Kafka. We demonstrated this by reading the latest updated data using Athena in near-real time.

As always, AWS welcomes feedback. If you have any comments or questions on this post, please share them in the comments.


About the authors

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Anand Prakash is a Senior Solutions Architect at AWS Data Lab. Anand focuses on helping customers design and build AI/ML, data analytics, and database solutions to accelerate their path to production.

Using AWS Shield Advanced protection groups to improve DDoS detection and mitigation

Post Syndicated from Joe Viggiano original https://aws.amazon.com/blogs/security/using-aws-shield-advanced-protection-groups-to-improve-ddos-detection-and-mitigation/

Amazon Web Services (AWS) customers can use AWS Shield Advanced to detect and mitigate distributed denial of service (DDoS) attacks that target their applications running on Amazon Elastic Compute Cloud (Amazon EC2), Elastic Local Balancing (ELB), Amazon CloudFront, AWS Global Accelerator, and Amazon Route 53. By using protection groups for Shield Advanced, you can logically group your collections of Shield Advanced protected resources. In this blog post, you will learn how you can use protection groups to customize the scope of DDoS detection for application layer events, and accelerate mitigation for infrastructure layer events.

What is a protection group?

A protection group is a resource that you create by grouping your Shield Advanced protected resources, so that the service considers them to be a single protected entity. A protection group can contain many different resources that compose your application, and the resources may be part of multiple protection groups spanning different AWS Regions within an AWS account. Common patterns that you might use when designing protection groups include aligning resources to applications, application teams, or environments (such as production and staging), and by product tiers (such as free or paid). For more information about setting up protection groups, see Managing AWS Shield Advanced protection groups.

Why should you consider using a protection group?

The benefits of protection groups differ for infrastructure layer (layer 3 and layer 4) events and application layer (layer 7) events. For layer 3 and layer 4 events, protection groups can reduce the time it takes for Shield Advanced to begin mitigations. For layer 7 events, protection groups add an additional reporting mechanism. There is no change in the mechanism that Shield Advanced uses internally for detection of an event, and you do not lose the functionality of individual resource-level detections. You receive both group-level and individual resource-level Amazon CloudWatch metrics to consume for operational use. Let’s look at the benefits for each layer in more detail.

Layers 3 and 4: Accelerate time to mitigate for DDoS events

For infrastructure layer (layer 3 and layer 4) events, Shield Advanced monitors the traffic volume to your protected resource. An abnormal traffic deviation signals the possibility of a DDoS attack, and Shield Advanced then puts mitigations in place. By default, Shield Advanced observes the elevation of traffic to a resource over multiple consecutive time intervals to establish confidence that a layer 3/layer 4 event is under way. In the absence of a protection group, Shield Advanced follows the default behavior of waiting to establish confidence before it puts mitigation in place for each resource. However, if the resources are part of a protection group, and if the service detects that one resource in a group is targeted, Shield Advanced uses that confidence for other resources in the group. This can accelerate the process of putting mitigations in place for those resources.

Consider a case where you have an application deployed in different AWS Regions, and each stack is fronted with a Network Load Balancer (NLB). When you enable Shield Advanced on the Elastic IP addresses associated with the NLB in each Region, you can optionally add those Elastic IP addresses to a protection group. If an actor targets one of the NLBs in the protection group and a DDoS attack is detected, Shield Advanced will lower the threshold for implementing mitigations on the other NLBs associated with the protection group. If the scope of the attack shifts to target the other NLBs, Shield Advanced can potentially mitigate the attack faster than if the NLB was not in the protection group.

Note: This benefit applies only to Elastic IP addresses and Global Accelerator resource types.

Layer 7: Reduce false positives and improve accuracy of detection for DDoS events

Shield Advanced detects application layer (layer 7) events when you associate a web access control list (web ACL) in AWS WAF with it. Shield Advanced consumes request data for the associated web ACL, analyzes it, and builds a traffic baseline for your application. The service then uses this baseline to detect anomalies in traffic patterns that might indicate a DDoS attack.

When you group resources in a protection group, Shield Advanced aggregates the data from individual resources and creates the baseline for the whole group. It then uses this aggregated baseline to detect layer 7 events for the group resource. It also continues to monitor and report for the resources individually, regardless of whether they are part of protection groups or not.

Shield Advanced provides three types of aggregation to choose from (sum, mean, and max) to aggregate the volume data of individual resources to use as a baseline for the whole group. We’ll look at the three types of aggregation, with a use case for each, in the next section.

Note: Traffic aggregation is applicable only for layer 7 detection.

Case 1: Blue/green deployments

Blue/green is a popular deployment strategy that increases application availability and reduces deployment risk when rolling out changes. The blue environment runs the current application version, and the green environment runs the new application version. When testing is complete, live application traffic is directed to the green environment, and the blue environment is dismantled.

During blue/green deployments, the traffic to your green resources can go from zero load to full load in a short period of time. Shield Advanced layer 7 detection uses traffic baselining for individual resources, so newly created resources like an Application Load Balancer (ALB) that are part of a blue/green operation would have no baseline, and the rapid increase in traffic could cause Shield Advanced to declare a DDoS event. In this scenario, the DDoS event could be a false positive.

Figure 1: A blue/green deployment with ALBs in a protection group. Shield is using the sum of total traffic to the group to baseline layer 7 traffic for the group as a single unit

Figure 1: A blue/green deployment with ALBs in a protection group. Shield is using the sum of total traffic to the group to baseline layer 7 traffic for the group as a single unit

In the example architecture shown in Figure 1, we have configured Shield to include all resources of type ALB in a single protection group with aggregation type sum. Shield Advanced will use the sum of traffic to all resources in the protection group as an additional baseline. We have only one ALB (called blue) to begin with. When you add the green ALB as part of your deployment, you can optionally add it to the protection group. As traffic shifts from blue to green, the total traffic to the protection group remains the same even though the volume of traffic changes for the individual resources that make up the group. After the blue ALB is deleted, the Shield Advanced baseline for that ALB is deleted with it. At this point, the green ALB hasn’t existed for sufficient time to have its own accurate baseline, but the protection group baseline persists. You could still receive a DDoSDetected CloudWatch metric with a value of 1 for individual resources, but with a protection group you have the flexibility to set one or more alarms based on the group-level DDoSDetected metric. Depending on your application’s use case, this can reduce non-actionable event notifications.

Note: You might already have alarms set for individual resources, because the onboarding wizard in Shield Advanced provides you an option to create alarms when you add protection to a resource. So, you should review the alarms you already have configured before you create a protection group. Simply adding a resource to a protection group will not reduce false positives.

Case 2: Resources that have traffic patterns similar to each other

Client applications might interact with multiple services as part of a single transaction or workflow. These services can be behind their own dedicated ALBs or CloudFront distributions and can have traffic patterns similar to each other. In the example architecture shown in Figure 2, we have two services that are always called to satisfy a user request. Consider a case where you add a new service to the mix. Before protection groups existed, setting up such a new protected resource, such as ALB or CloudFront, required Shield Advanced to build a brand-new baseline. You had to wait for a certain minimum period before Shield Advanced could start monitoring the resource, and the service would need to monitor traffic for a few days in order to be accurate.

Figure 2: Deploying a new service and including it in a protection group with an existing baseline. Shield is using the mean aggregation type to baseline traffic for the group.

Figure 2: Deploying a new service and including it in a protection group with an existing baseline. Shield is using the mean aggregation type to baseline traffic for the group.

For improved accuracy of detection of level 7 events, you can cause Shield Advanced to inherit the baseline of existing services that are part of the same transaction or workflow. To do so, you can put your new resource in a protection group along with an existing service or services, and set the aggregation type to mean. Shield Advanced will take some time to build up an accurate baseline for the new service. However, the protection group has an established baseline, so the new service won’t be susceptible to decreased accuracy of detection for that period of time. Note that this setting will not stop Shield Advanced from sending notifications for the new service individually; however, you might prefer to take corrective action based on the detection for the group instead.

Case 3: Resources that share traffic in a non-uniform way

Consider the case of a CloudFront distribution with an ALB as origin. If the content is cached in CloudFront edge locations, the traffic reaching the application will be lower than that received by the edge locations. Similarly, if there are multiple origins of a CloudFront distribution, the traffic volumes of individual origins will not reflect the aggregate traffic for the application. Scenarios like invalidation of cache or an origin failover can result in increased traffic at one of the ALB origins. This could cause Shield Advanced to send “1” as the value for the DDoSDetected CloudWatch metric for that ALB. However, you might not want to initiate an alarm or take corrective action in this case.

Figure 3: CloudFront and ALBs in a protection group with aggregation type max. Shield is using CloudFront’s baseline for the group

Figure 3: CloudFront and ALBs in a protection group with aggregation type max. Shield is using CloudFront’s baseline for the group

You can combine the CloudFront distribution and origin (or origins) in a protection group with the aggregation type set to max. Shield Advanced will consider the CloudFront distribution’s traffic volume as the baseline for the protection group as a whole. In the example architecture in Figure 3, a CloudFront distribution fronts two ALBs and balances the load between the two. We have bundled all three resources (CloudFront and two ALBs) into a protection group. In case one ALB fails, the other ALB will receive all the traffic. This way, although you might receive an event notification for the active ALB at the individual resource level if Shield detects a volumetric event, you might not receive it for the protection group because Shield Advanced will use CloudFront traffic as the baseline for determining the increase in volume. You can set one or more alarms and take corrective action according to your application’s use case.

Conclusion

In this blog post, we showed you how AWS Shield Advanced provides you with the capability to group resources in order to consider them a single logical entity for DDoS detection and mitigation. This can help reduce the number of false positives and accelerate the time to mitigation for your protected applications.

A Shield Advanced subscription provides additional capabilities, beyond those discussed in this post, that supplement your perimeter protection. It provides integration with AWS WAF for level 7 DDoS detection, health-based detection for reducing false positives, enhanced visibility into DDoS events, assistance from the Shield Response team, custom mitigations, and cost-protection safeguards. You can learn more about Shield Advanced capabilities in the AWS Shield Advanced User Guide.

 
If you have feedback about this blog post, submit comments in the Comments section below. You can also start a new thread on AWS Shield re:Post to get answers from the community.

Want more AWS Security news? Follow us on Twitter.

Joe Viggiano

Joe Viggiano

Joe is a Sr. Solutions Architect helping media and entertainment companies accelerate their adoptions of cloud-based solutions.

Deepak Garg

Deepak Garg

Deepak is a Solutions Architect at AWS. He loves diving deep into AWS services and sharing his knowledge with customers. Deepak has background in Content Delivery Networks and Telecommunications.

How to centralize findings and automate deletion for unused IAM roles

Post Syndicated from Hong Pham original https://aws.amazon.com/blogs/security/how-to-centralize-findings-and-automate-deletion-for-unused-iam-roles/

Maintaining AWS Identity and Access Management (IAM) resources is similar to keeping your garden healthy over time. Having visibility into your IAM resources, especially the resources that are no longer used, is important to keep your AWS environment secure. Proactively detecting and responding to unused IAM roles helps you prevent unauthorized entities from gaining access to your AWS resources. In this post, I will show you how to apply resource tags on IAM roles and deploy serverless technologies on AWS to detect unused IAM roles and to require the owner of the IAM role (identified through tags) to take action.

You can use this solution to check for unused IAM roles in a standalone AWS account. As you grow your workloads in the cloud, you can run this solution for multiple AWS accounts by using AWS Organizations. In this solution, you use AWS Control Tower to create an AWS Organizations organization with a Security organizational unit (OU), and a Security account in this OU. In this blog post, you deploy the solution in the Security account belonging to a Security OU of an organization.

For more information and recommended best practices, see the blog post Managing the multi-account environment using AWS Organizations and AWS Control Tower. Following this best practice, you can create a Security OU, in which you provision one or more Security and Audit accounts that are dedicated for security automation and audit activities on behalf of the entire organization.

Solution architecture

The architecture diagram in Figure 1 demonstrates the solution workflow.

Figure 1: Solution workflow for standalone account or member account of an AWS Organization.

Figure 1: Solution workflow for standalone account or member account of an AWS Organization.

The solution is triggered periodically by an Amazon EventBridge scheduled rule and invokes a series of actions. You specify the frequency (in number of days) when you create the EventBridge rule. There are two options to run this solution, based on the needs of your organization.

Option 1: For a standalone account

Choose this option if you would like to check for unused IAM roles in a single AWS account. This AWS account might or might not belong to an organization or OU. In this blog post, I refer to this account as the standalone account.

Prerequisites

  1. You need an AWS account specifically for security automation. For this blog post, I refer to this account as the standalone Security account.
  2. You should deploy the solution to the standalone Security account, which has appropriate admin permission to audit other accounts and manage security automation.
  3. Because this solution uses AWS CloudFormation StackSets, you need to grant self-managed permissions to create stack sets in standalone accounts. Specifically, you need to establish a trust relationship between the standalone Security account and the standalone account by creating the AWSCloudFormationStackSetAdministrationRole IAM role in the standalone Security account, and the AWSCloudFormationStackSetExecutionRole IAM role in the standalone account.
  4. You need to have AWS Security Hub enabled in your standalone Security account, and you need to deploy the solution in the same AWS Region as your Security Hub dashboard.
  5. You need a tagging enforcement in place for IAM roles. This solution uses an IAM tag key Owner to identify the email address of the owner. The value of this tag key should be the email address associated with the owner of the IAM role. If the Owner tag isn’t available, the notification email is sent to the email address that you provided in the parameter ITSecurityEmail when you provisioned the CloudFormation stack.
  6. This solution uses Amazon Simple Email Service (Amazon SES) to send emails to the owner of the IAM roles. The destination address needs to be verified with Amazon SES. With Amazon SES, you can verify identity at the individual email address or at the domain level.

An EventBridge rule triggers the AWS Lambda function LambdaCheckIAMRole in the standalone Security account. The LambdaCheckIAMRolefunction assumes a role in the standalone account. This role is named after the Cloudformation stack name that you specify when you provision the solution. Then LambdaCheckIAMRole calls the IAM API action GetAccountAuthorizationDetails to get the list of IAM roles in the standalone account, and parses the data type RoleLastUsed to retrieve the date, time, and the Region in which the roles were last used. If the last time value is not available, the IAM role is skipped. Based on the CloudFormation parameter MaxDaysForLastUsed that you provide, LambdaCheckIAMRole determines if the last time used is greater than the MaxDaysForLastUsed value. LambdaCheckIAMRole also extracts tags associated with the IAM roles, and retrieves the email address of the IAM role owner from the value of the tag key Owner. If there is no Owner tag, then LambdaCheckIAMRole sends an email to a default email address provided by you from the CloudFormation parameter ITSecurityEmail.

Option 2: For all member accounts that belong to an organization or an OU

Choose this option if you want to check for unused IAM roles in every member account that belongs to an AWS Organizations organization or OU.

Prerequisites

  1. You need to have an AWS Organizations organization with a dedicated Security account that belongs to a Security OU. For this blog post, I refer to this account as the Security account.
  2. You should deploy the solution to the Security account that has appropriate admin permission to audit other accounts and to manage security automation.
  3. Because this solution uses CloudFormation StackSets to create stack sets in member accounts of the organization or OU that you specify, the Security account in the Security OU needs to be granted CloudFormation delegated admin permission to create AWS resources in this solution.
  4. You need Security Hub enabled in your Security account, and you need to deploy the solution in the same Region as your Security Hub dashboard.
  5. You need tagging enforcement in place for IAM roles. This solution uses the IAM tag key Owner to identify the owner email address. The value of this tag key should be the email address associated with the owner of the IAM role. If the Owner tag isn’t available, the notification email will be sent to the email address that you provided in the parameter ITSecurityEmail when you provisioned the CloudFormation stack.
  6. This solution uses Amazon SES to send emails to the owner of the IAM roles. The destination address needs to be verified with Amazon SES. With Amazon SES, you can verify identity at the individual email address or at the domain level.

An EventBridge rule triggers the Lambda function LambdaGetAccounts in the Security account to collect the account IDs of member accounts that belong to the organization or OU. LambdaGetAccounts sends those account IDs to an SNS topic. Each account ID invokes the Lambda function LambdaCheckIAMRole once.

Similar to the process for Option 1, LambdaCheckIAMRole in the Security account assumes a role in the member account(s) of the organization or OU, and checks the last time that IAM roles in the account were used.

In both options, if an IAM role is not currently used, the function LambdaCheckIAMRole generates a Security Hub finding, and performs BatchImportFindings for all findings to Security Hub in the Security account. At the same time, the Lambda function starts an AWS Step Functions state machine execution. Each execution is for an unused IAM role following this naming convention:
[target-account-id]-[unused IAM role name]-[time the execution created in Unix format]

You should avoid running this solution against special IAM roles, such as a break-glass role or a disaster recovery role. In the CloudFormation parameter RolePatternAllowedlist, you can provide a list of role name patterns to skip the check.

Use a Step Functions state machine to process approval

Figure 2 shows the state machine workflow for owner approval.

Figure 2: Owner approval state machine workflow

Figure 2: Owner approval state machine workflow

After the solution identifies an unused IAM role, it creates a Step Functions state machine execution. Figure 2 demonstrates the workflow of the execution. After the execution starts, the first Lambda task NotifyOwner (powered by the Lambda function NotifyOwnerFunction) sends an email to notify the IAM role owner. This is a callback task that pauses the execution until a taskToken is returned. The maximum pause for a callback task is 1 year. The execution waits until the owner responds with a decision to delete or keep the role, which is captured by a private API endpoint in Amazon API Gateway. You can configure a timeout to avoid waiting for callback task execution.

With a private API endpoint, you can build a REST API that is only accessible within your Amazon Virtual Private Cloud (Amazon VPC), or within your internal network connected to your VPC. Using a private API endpoint will prevent anyone from outside of your internal network from selecting this link and deleting the role. You can implement authentication and authorization with API Gateway to make sure that only the appropriate owner can delete a role.

If the owner denies role deletion, then the role remains intact until the next automation cycle runs, and the state machine execution stops immediately with a Fail status. If the owner approves role deletion, the next Lambda task Approve (powered by the function ApproveFunction) checks again if the role is not currently used. If the role isn’t in use, the Lambda task Approve attaches an IAM policy DenyAllCheckUnusedIAMRoleSolution to deny the role to perform any actions, and waits for 30 days. During this wait time, you can restore the IAM role by removing the IAM policy DenyAllCheckUnusedIAMRoleSolution from the role. The Step Functions state machine execution for this role is still in progress until the wait time expires.

After the wait time expires, the state machine execution invokes the Validate task. The Lambda function ValidateFunction checks again if the role is not in use after the amount of time calculated by adding MaxDaysForLastUsed and the preceding wait time. It also checks if the IAM policy DenyAllCheckUnusedIAMRoleSolution is attached to the role. If both of these conditions are true, the Lambda function follows a process to detach the IAM policies and delete the role permanently. The role can’t be recovered after deletion.

Note: To restore a role that has been marked for deletion, detach the DenyAll IAM policy from the role.

To deploy the solution using the AWS CLI

  1. Clone git repo from AWS Samples to get source code and CloudFormation templates.
    git clone https://github.com/aws-samples/aws-blog-automate-iam-role-deletion 
    cd /aws-blog-automate-iam-role-deletion

  2. Run the AWS CLI command below to upload CloudFormation templates and Lambda code to a S3 bucket in the Security Account. The S3 bucket needs to be in the same Region where you will deploy the solution.
    • To deploy the solution for a single account, use the following commands. Be sure to replace <YOUR_BUCKET_NAME> and <PATH_TO_UPLOAD_CODE> with your own values.
      #Deploy solution for a single target AWS Account
      aws cloudformation package \
      --template-file solution_scope_account.yml \
      --s3-bucket <YOUR_BUCKET_NAME> \
      --s3-prefix <PATH_TO_UPLOAD_CODE> \
      --output-template-file solution_scope_account.template

    • To deploy the solution for an organization or OU, use the following commands. Be sure to replace <YOUR_BUCKET_NAME> and <PATH_TO_UPLOAD_CODE> with your own values.
      #Deploy solution for an Organization/OU
      aws cloudformation package \
      --template-file solution_scope_organization.yml \
      --s3-bucket <YOUR_BUCKET_NAME> \
      --s3-prefix <PATH_TO_UPLOAD_CODE> \
      --output-template-file solution_scope_organization.template

  3. Validate the template generated by the CloudFormation package.
    • To validate the solution for a single account, use the following commands.
      #Deploy solution for a single target AWS Account
      aws cloudformation validate-template —template-body file://solution_scope_account.template

    • To validate the solution for an organization or OU, use the following commands.
      #Deploy solution for an Organization/OU
      aws cloudformation validate-template —template-body file://solution_scope_organization.template

  4. Deploy the solution in the same Region that you use for Security Hub. The stack takes 30 minutes to complete deployment.
    • To deploy the solution for a single account, use the following commands. Be sure to replace all of the placeholders with your own values.
      #Deploy solution for a single target AWS Account
      aws cloudformation deploy \
      --template-file solution_scope_account.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
      --parameter-overrides AccountId='<STANDALONE ACCOUNT ID>' \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail='<YOUR IT TEAM EMAIL>' \
      RolePatternAllowedlist='<ALLOWED PATTERN>'

    • To deploy the solution for an organization, run the following commands to create CloudFormation stack in the Security Account of the organization.
      #Deploy solution for an Organization
      aws cloudformation deploy \
      --template-file solution_scope_organization.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
      --parameter-overrides Scope=Organization \
      OrganizationId='<o-12345abcde>' \
      OrgRootId='<r-1234>'  \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail='<[email protected]>' \
      RolePatternAllowedlist='<ALLOWED PATTERN>'

    • To deploy the solution for an OU, run the following commands to create CloudFormation stack in the Security Account of the organization.
      #Deploy solution for an OU
      aws cloudformation deploy \
      --template-file solution_scope_organization.template \
      --stack-name <UNIQUE_STACK_NAME> \
      --region <REGION> \
      --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
      --parameter-overrides Scope=OrganizationalUnit \
      OrganizationId='<o-12345abcde>' \
      OrganizationalUnitId='<ou-1234-1234abcd>'  \
      Frequency=<DAYS> MaxDaysForLastUsed=<DAYS> \
      ITSecurityEmail=’<[email protected]>’ \
      RolePatternAllowedlist=’<ALLOWED PATTERN>

Test the solution

The solution is triggered by an EventBridge scheduled rule, so it doesn’t perform the checks immediately. To test the solution right away after the CloudFormation stacks are successfully created, follow these steps.

To manually trigger the automation for a single account

  1. Navigate to the AWS Lambda console and choose the function
    <CloudFormation stackname>-LambdaCheckIAMRole.
  2. Choose Test.
  3. Choose New event.
  4. For Name, enter a name for the event, and provide the current time in UTC Date Time format YYYY-MM-DDTHH:MM:SSZ. For example {“time”: “2022-01-22T04:36:52Z”}. The Lambda function uses this value to calculate how much time has passed since the last time that a role was used. Figure 5 shows an example of configuring a test event.
    Figure 5: Configure test event for standalone account

    Figure 5: Configure test event for standalone account

  5. Choose Test.

To manually trigger the automation for an organization or OU

  1. Choose the function
    [CloudFormation stackname]-LambdaGetAccounts.
  2. Choose Test.
  3. Choose New event.
  4. For Name, enter a name for the event. Leave the default values for the remaining fields.
  5. Choose Test.

Respond to unused IAM roles

After you’ve triggered the Lambda function, the automation runs the necessary checks. For each unused IAM role, it creates a Step Functions state machine execution.

To see the list of Step Functions state machine executions

  1. Navigate to the AWS Step Functions console.
  2. Choose state machine [CloudFormation stackname]OnwerApprovalStateMachine.
  3. Under the Executions tab, you will see the list of executions in running state following this naming convention: [target-account-id]-[unused IAM role name]-[time the execution created in Unix format]. Figure 6 shows an example list of executions.
    Figure 6: Each unused IAM role generates an execution in the Step Functions state machine

    Figure 6: Each unused IAM role generates an execution in the Step Functions state machine

Each execution sends out an email notification to the IAM role owner (if available through the Owner tag) or to the IT security email address that you provided in the CloudFormation stack parameter ITSecurityEmail. The email content is:

Subject: Please take action on this unused IAM Role
 
Hello!
 
This IAM Role arn:aws:iam::<AWS account>:role/<role name> is not in use for
more than 60 days.
 
Can you please delete the role by following this link: Approve link
 
Or keep this role by following this link: Deny Link

In the email, the Approve link and Deny link is the hyperlink to a private API endpoint with a parameter taskToken. If you try to access these links publicly, they won’t work. When you access the link, the taskToken is provided to the private API endpoint, which updates the Step Functions state machine.

To test the approval action using an API Gateway test

  1. Navigate to the AWS Step Functions console. Under State machines, choose the state machine that has the name [CloudFormation stackname]OwnerApprovalStateMachine
  2. On the Executions tab, there is a list of executions. Each execution represents a workflow for one IAM role, as shown in Figure 6. Choose the execution name that includes the IAM role name in the email that you received earlier.
  3. Scroll down to Execution event history.
  4. Expand the Step Notify Owner, enter TaskScheduled, find the item taskToken, and copy its value to a notepad, as shown in Figure 7.
    Figure 7: Retrieve taskToken from execution

    Figure 7: Retrieve taskToken from execution

  5. Navigate to the API Gateway console.
  6. Choose the API that has a name similar to [CloudFormation stackname]-PrivateAPIGW-[unique string]-ApprovalEndpoint.
  7. Choose which action to test: Deny or Approve.
    • To test the Deny action, under /deny resource, choose the GET method.
    • To test the Approve action, under /approve resource, choose the GET method.
  8. Choose Test.
  9. Under Query Strings, enter taskToken= and paste the taskToken you copied earlier from the state machine execution. Figure 8 shows how to pass the taskToken to API Gateway.
    Figure 8: Provide taskToken to API Gateway Method

    Figure 8: Provide taskToken to API Gateway Method

  10. Choose Test. After you test, the state machine resumes the workflow and finishes the automation. You won’t be able to change the action.
  11. Navigate to the AWS Step Functions console. Choose the state machine and go to the state machine execution.
    1. If you choose to deny the role deletion, the execution immediately stops as Fail.
    2. If you choose to approve the role deletion, the execution moves to the Wait task. This task removes IAM policies associated to the role and waits for a period of time before moving to the next task. By default, the wait time is 30 days. To change this number, go to the Lambda function [CloudFormation stackname]ApproveFunction, and update the variable wait_time_stamp.
    3. After the waiting period expires, the state machine triggers the Validate task to do a final validation on the role before deleting it. If the Validate task decides that the role is being used, it leaves the role intact. Otherwise, it deletes the role permanently.

Conclusion

In this blog post, you learned how serverless services such as Lambda, Step Functions, and API Gateway can work together to build security automation. We recommend testing this solution as a starting point. Then, you can build more features on top of the sample code and templates to customize it to perform checks, following guidance from your IT security team.

Here are a few suggestions that you can take to extend this solution.

  • This solution uses a private API Gateway to handle the approval response from the IAM role owner. You need to establish private connectivity between your internal network and AWS to invoke a private API Gateway. For instructions, see How to invoke a private API.
  • Add a mechanism to control access to API Gateway by using endpoint policies for interface VPC endpoints.
  • Archive the Security Hub finding after the IAM role is deleted using the AWS CLI or AWS Console.
  • Use a Step Functions state machine for other automation that needs human approval.
  • Add the capability to report on IAM roles that were skipped due to the absence of RoleLastUsed information.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Hong Pham

Hong Pham

Hong is a Senior Solutions Architect at AWS. For more than five years, she has helped many customers from start-ups to enterprises in different industries to adopt Cloud Computing. She was born in Vietnam and currently lives in Seattle, Washington.

Deploy and manage OpenAPI/Swagger RESTful APIs with the AWS Cloud Development Kit

Post Syndicated from Luke Popplewell original https://aws.amazon.com/blogs/devops/deploy-and-manage-openapi-swagger-restful-apis-with-the-aws-cloud-development-kit/

This post demonstrates how AWS Cloud Development Kit (AWS CDK) Infrastructure as Code (IaC) constructs and AWS serverless technology can be used to build and deploy a RESTful Application Programming Interface (API) defined in the OpenAPI specification. This post uses an example API that describes  Widget resources and demonstrates how to use an AWS CDK Pipeline to:

  • Deploy a RESTful API stage to Amazon API Gateway from an OpenAPI specification.
  • Build and deploy an AWS Lambda function that contains the API functionality.
  • Auto-generate API documentation and publish it to an Amazon Simple Storage Service (Amazon S3)-hosted website served by the Amazon CloudFront content delivery network (CDN) service. This provides technical and non-technical stakeholders with versioned, current, and accessible API documentation.
  • Auto-generate client libraries for invoking the API and deploy them to AWS CodeArtifact, which is a fully-managed artifact repository service. This allows API client development teams to integrate with different versions of the API in different environments.

The diagram shown in the following figure depicts the architecture of the AWS services and resources described in this post.

 The architecture described in this post consists of an AWS CodePipeline pipeline, provisioned using the AWS CDK, that deploys the Widget API to AWS Lambda and API Gateway. The pipeline then auto-generates the API’s documentation as a website served by CloudFront and deployed to S3. Finally, the pipeline auto-generates a client library for the API and deploys this to CodeArtifact.

Figure 1 – Architecture

The code that accompanies this post, written in Java, is available here.

Background

APIs must be understood by all stakeholders and parties within an enterprise including business areas, management, enterprise architecture, and other teams wishing to consume the API. Unfortunately, API definitions are often hidden in code and lack up-to-date documentation. Therefore, they remain inaccessible for the majority of the API’s stakeholders. Furthermore, it’s often challenging to determine what version of an API is present in different environments at any one time.

This post describes some solutions to these issues by demonstrating how to continuously deliver up-to-date and accessible API documentation, API client libraries, and API deployments.

AWS CDK

The AWS CDK is a software development framework for defining cloud IaC and is available in multiple languages including TypeScript, JavaScript, Python, Java, C#/.Net, and Go. The AWS CDK Developer Guide provides best practices for using the CDK.

This post uses the CDK to define IaC in Java which is synthesized to a cloud assembly. The cloud assembly includes one to many templates and assets that are deployed via an AWS CodePipeline pipeline. A unit of deployment in the CDK is called a Stack.

OpenAPI specification (formerly Swagger specification)

OpenAPI specifications describe the capabilities of an API and are both human and machine-readable. They consist of definitions of API components which include resources, endpoints, operation parameters, authentication methods, and contact information.

Project composition

The API project that accompanies this post consists of three directories:

  • app
  • api
  • cdk

app directory

This directory contains the code for the Lambda function which is invoked when the Widget API is invoked via API Gateway. The code has been developed in Java as an Apache Maven project.

The Quarkus framework has been used to define a WidgetResource class (see src/main/java/aws/sample/blog/cdkopenapi/app/WidgetResources.java ) that contains the methods that align with HTTP Methods of the Widget API.
api directory

The api directory contains the OpenAPI specification file ( openapi.yaml ). This file is used as the source for:

  • Defining the REST API using API Gateway’s support for OpenApi.
  • Auto-generating the API documentation.
  • Auto-generating the API client artifact.

The api directory also contains the following files:

  • openapi-generator-config.yaml : This file contains configuration settings for the OpenAPI Generator framework, which is described in the section CI/CD Pipeline.
  • maven-settings.xml: This file is used support the deployment of the generated SDKs or libraries (Apache Maven artifacts) for the API and is described in the CI/CD Pipeline section of this post.

This directory contains a sub directory called docker . The docker directory contains a Dockerfile which defines the commands for building a Docker image:

FROM ruby:2.6.5-alpine
 
RUN apk update \
 && apk upgrade --no-cache \
 && apk add --no-cache --repository http://dl-cdn.alpinelinux.org/alpine/v3.14/main/ nodejs=14.20.0-r0 npm \
 && apk add git \
 && apk add --no-cache build-base
 
# Install Widdershins node packages and ruby gem bundler 
RUN npm install -g widdershins \
 && gem install bundler 
 
# working directory
WORKDIR /openapi
 
# Clone and install the Slate framework
RUN git clone https://github.com/slatedocs/slate
RUN cd slate \
 && bundle install

The Docker image incorporates two open source projects, the NodeJS Widdershins library and the Ruby Slate-framework. These are used together to auto-generate the documentation for the API from the OpenAPI specification.  This Dockerfile is referenced and built by the  ApiStack class, which is described in the CDK Stacks section of this post.

cdk directory

This directory contains an Apache Maven Project developed in Java for provisioning the CDK stacks for the  Widget API.

Under the  src/main/java  folder, the package  aws.sample.blog.cdkopenapi.cdk  contains the files and classes that define the application’s CDK stacks and also the entry point (main method) for invoking the stacks from the CDK Toolkit CLI:

  • CdkApp.java: This file contains the  CdkApp class which provides the main method that is invoked from the AWS CDK Toolkit to build and deploy the  application stacks.
  • ApiStack.java: This file contains the   ApiStack class which defines the  OpenApiBlogAPI   stack and is described in the CDK Stacks section of this post.
  • PipelineStack.java: This file contains the   PipelineStack class which defines the OpenAPIBlogPipeline  stack and is described in the CDK Stacks section of this post.
  • ApiStackStage.java: This file contains the  ApiStackStage class which defines a CDK stage. As detailed in the CI/CD Pipeline section of this post, a DEV stage, containing the  OpenApiBlogAPI stack resources for a DEV environment, is deployed from the  OpenApiBlogPipeline pipeline.

CDK stacks

ApiStack

Note that the CDK bundling functionality is used at multiple points in the  ApiStack  class to produce CDK Assets. The post, Building, bundling, and deploying applications with the AWS CDK, provides more details regarding using CDK bundling mechanisms.

The  ApiStack  class defines multiple resources including:

  • Widget API Lambda function: This is bundled by the CDK in a Docker container using the Java 11 runtime image.
  • Widget  REST API on API Gateway: The REST API is created from an Inline API Definition which is passed as an S3 CDK Asset. This asset includes a reference to the  Widget API OpenAPI specification located under the  api folder (see  api/openapi.yaml ) and builds upon the SpecRestApi construct and API Gateway’s support for OpenApi.
  • API documentation Docker Image Asset: This is the Docker image that contains the open source frameworks (Widdershins and Slate) that are leveraged to generate the API documentation.
  • CDK Asset bundling functionality that leverages the API documentation Docker image to auto-generate documentation for the API.
  • An S3 Bucket for holding the API documentation website.
  • An origin access identity (OAI) which allows CloudFront to securely serve the S3 Bucket API documentation content.
  • A CloudFront distribution which provides CDN functionality for the S3 Bucket website.

Note that the  ApiStack class features the following code which is executed on the  Widget API Lambda construct:

CfnFunction apiCfnFunction = (CfnFunction)apiLambda.getNode().getDefaultChild();
apiCfnFunction.overrideLogicalId("APILambda");

The CDK, by default, auto-assigns an ID for each defined resource but in this case the generated ID is being overridden with “APILambda”. The reason for this is that inside of the  Widget API OpenAPI specification (see  api/openapi.yaml ), there is a reference to the Lambda function by name (“APILambda”) so that the function can be integrated as a proxy for each listed API path and method combination. The OpenAPI specification includes this name as a variable to derive the Amazon Resource Name (ARN) for the Lambda function:

uri:
	Fn::Sub: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${APILambda.Arn}/invocations"

PipelineStack

The  PipelineStack class defines a CDK CodePipline construct which is a higher level construct and pattern. Therefore, the construct doesn’t just map directly to a single CloudFormation resource, but provisions multiple resources to fulfil the requirements of the pattern. The post, CDK Pipelines: Continuous delivery for AWS CDK applications, provides more detail on creating pipelines with the CDK.

final CodePipeline pipeline = CodePipeline.Builder.create(this, "OpenAPIBlogPipeline")
.pipelineName("OpenAPIBlogPipeline")
.selfMutation(true)
      .dockerEnabledForSynth(true)
      .synth(synthStep)
      .build();

CI/CD pipeline

The diagram in the following figure shows the multiple CodePipeline stages and actions created by the CDK CodePipeline construct that is defined in the PipelineStack class.

The CI/CD pipeline’s stages include the Source stage, the Synth stage, the Update pipeline, the Assets stage, and the DEV stage.

Figure 2 – CI/CD Pipeline

The stages defined include the following:

  • Source stage: The pipeline is passed the source code contents from this stage.
  • Synth stage: This stage consists of a Synth Action that synthesizes the CloudFormation templates for the application’s CDK stacks and compiles and builds the project Lambda API function.
  • Update Pipeline stage: This stage checks the OpenAPIBlogPipeline stack and reinitiates the pipeline when changes to its definition have been deployed.
  • Assets stage: The application’s CDK stacks produce multiple file assets (for example, zipped Lambda code) which are published to Amazon S3. Docker image assets are published to a managed CDK framework Amazon Elastic Container Registry (Amazon ECR) repository.
  • DEV stage: The API’s CDK stack ( OpenApiBlogAPI ) is deployed to a hypothetical development environment in this stage. A post stage deployment action is also defined in this stage. Through the use of a CDK ShellStep construct, a Bash script is executed that deploys a generated client Java Archive (JAR) for the Widget API to CodeArtifact. The script employs the OpenAPI Generator project for this purpose:
CodeBuildStep codeArtifactStep = CodeBuildStep.Builder.create("CodeArtifactDeploy")
    .input(pipelineSource)
    .commands(Arrays.asList(
           	"echo $REPOSITORY_DOMAIN",
           	"echo $REPOSITORY_NAME",
           	"export CODEARTIFACT_TOKEN=`aws codeartifact get-authorization-token --domain $REPOSITORY_DOMAIN --query authorizationToken --output text`",
           	"export REPOSITORY_ENDPOINT=$(aws codeartifact get-repository-endpoint --domain $REPOSITORY_DOMAIN --repository $REPOSITORY_NAME --format maven | jq .repositoryEndpoint | sed 's/\\\"//g')",
           	"echo $REPOSITORY_ENDPOINT",
           	"cd api",
           	"wget -q https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli/5.4.0/openapi-generator-cli-5.4.0.jar -O openapi-generator-cli.jar",
     	          "cp ./maven-settings.xml /root/.m2/settings.xml",
        	          "java -jar openapi-generator-cli.jar batch openapi-generator-config.yaml",
                    "cd client",
                    "mvn --no-transfer-progress deploy -DaltDeploymentRepository=openapi--prod::default::$REPOSITORY_ENDPOINT"
))
      .rolePolicyStatements(Arrays.asList(codeArtifactStatement, codeArtifactStsStatement))
.env(new HashMap<String, String>() {{
      		put("REPOSITORY_DOMAIN", codeArtifactDomainName);
            	put("REPOSITORY_NAME", codeArtifactRepositoryName);
       }})
      .build();

Running the project

To run this project, you must install the AWS CLI v2, the AWS CDK Toolkit CLI, a Java/JDK 11 runtime, Apache Maven, Docker, and a Git client. Furthermore, the AWS CLI must be configured for a user who has administrator access to an AWS Account. This is required to bootstrap the CDK in your AWS account (if not already completed) and provision the required AWS resources.

To build and run the project, perform the following steps:

  1. Fork the OpenAPI blog project in GitHub.
  2. Open the AWS Console and create a connection to GitHub. Note the connection’s ARN.
  3. In the Console, navigate to AWS CodeArtifact and create a domain and repository.  Note the names used.
  4. From the command line, clone your forked project and change into the project’s directory:
git clone https://github.com/<your-repository-path>
cd <your-repository-path>
  1. Edit the CDK JSON file at  cdk/cdk.json  and enter the details:
"RepositoryString": "<your-github-repository-path>",
"RepositoryBranch": "<your-github-repository-branch-name>",
"CodestarConnectionArn": "<connection-arn>",
"CodeArtifactDomain": "<code-artifact-domain-name>",
"CodeArtifactRepository": "<code-artifact-repository-name>"

Please note that for setting configuration values in CDK applications, it is recommend to use environment variables or AWS Systems Manager parameters.

  1. Commit and push your changes back to your GitHub repository:
git push origin main
  1. Change into the  cdk directory and bootstrap the CDK in your AWS account if you haven’t already done so (enter “Y” when prompted):
cd cdk
cdk bootstrap
  1. Deploy the CDK pipeline stack (enter “Y” when prompted):
cdk deploy OpenAPIBlogPipeline

Once the stack deployment completes successfully, the pipeline  OpenAPIBlogPipeline will start running. This will build and deploy the API and its associated resources. If you open the Console and navigate to AWS CodePipeline, then you’ll see a pipeline in progress for the API.

Once the pipeline has completed executing, navigate to AWS CloudFormation to get the output values for the  DEV-OpenAPIBlog  stack deployment:

  1. Select the  DEV-OpenAPIBlog  stack entry and then select the Outputs column. Record the REST_URL value for the key that begins with   OpenAPIBlogRestAPIEndpoint .
  2. Record the CLOUDFRONT_URL value for the key  OpenAPIBlogCloudFrontURL .

The API ping method at https://<REST_URL>/ping can now be invoked using your browser or an API development tool like Postman. Other API other methods, as defined by the OpenApi specification, are also available for invocation (For example, GET https://<REST_URL>/widgets).

To view the generated API documentation, open a browser at https://< CLOUDFRONT_URL>.

The following figure shows the API documentation website that has been auto-generated from the API’s OpenAPI specification. The documentation includes code snippets for using the API from multiple programming languages.

The API’s auto-generated documentation website provides descriptions of the API’s methods and resources as well as code snippets in multiple languages including JavaScript, Python, and Java.

Figure 3 – Auto-generated API documentation

To view the generated API client code artifact, open the Console and navigate to AWS CodeArtifact. The following figure shows the generated API client artifact that has been published to CodeArtifact.

The CodeArtifact service user interface in the Console shows the different versions of the API’s auto-generated client libraries.

Figure 4 – API client artifact in CodeArtifact

Cleaning up

  1. From the command change to the  cdk directory and remove the API stack in the DEV stage (enter “Y” when prompted):
cd cdk
cdk destroy OpenAPIBlogPipeline/DEV/OpenAPIBlogAPI
  1. Once this has completed, delete the pipeline stack:
cdk destroy OpenAPIBlogPipeline
  1. Delete the S3 bucket created to support pipeline operations. Open the Console and navigate to Amazon S3. Delete buckets with the prefix  openapiblogpipeline .

Conclusion

This post demonstrates the use of the AWS CDK to deploy a RESTful API defined by the OpenAPI/Swagger specification. Furthermore, this post describes how to use the AWS CDK to auto-generate API documentation, publish this documentation to a web site hosted on Amazon S3, auto-generate API client libraries or SDKs, and publish these artifacts to an Apache Maven repository hosted on CodeArtifact.

The solution described in this post can be improved by:

  • Building and pushing the API documentation Docker image to Amazon ECR, and then using this image in CodePipeline API pipelines.
  • Creating stages for different environments such as TEST, PREPROD, and PROD.
  • Adding integration testing actions to make sure that the API Deployment is working correctly.
  • Adding Manual approval actions for that are executed before deploying the API to PROD.
  • Using CodeBuild caching of artifacts including Docker images and libraries.

About the author:

Luke Popplewell

Luke Popplewell works primarily with federal entities in the Australian Government. In his role as an architect, Luke uses his knowledge and experience to help organisations reach their goals on the AWS cloud. Luke has a keen interest in serverless technology, modernization, DevOps and event-driven architectures.

Best practices to optimize cost and performance for AWS Glue streaming ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/

AWS Glue streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or any other Apache Kafka cluster. It uses the Spark Structured Streaming framework to perform data processing in near-real time.

This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.

We discuss the following topics:

  • Development tools that help you code faster using our newly launched AWS Glue Studio notebooks
  • How to monitor and tune your streaming jobs
  • Best practices for sizing and scaling your AWS Glue cluster, using our newly launched features like auto scaling and the small worker type G 0.25X

Development tools

AWS Glue Studio notebooks can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.

Before you run any code in the notebook (which would start the session), you need to set some important configurations.

The magic %streaming creates the session cluster using the same runtime as AWS Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.

Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.

See the following configuration:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

For additional configuration options such as version or number of workers, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to Launching the Spark History Server for deployment instructions.

Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.
The following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

The AWS Glue API helps you create the DataFrame by doing schema detection and auto decompression, depending on the format. You can also build it yourself using the Spark API directly:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.

History server home page

From the notebook, you can take a sample of the streaming data. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.

Monitor the cluster with Structured Streaming

The best way to monitor and tune your AWS Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the Structured Streaming tab and the details of each individual micro-batch processing job.

Overall view of the streaming job

On the Structured Streaming tab, you can see a summary of the streams running in the cluster, as in the following example.

Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling queryName() if you use the writeStream API directly on the DataFrame.

After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use Avg Input/sec column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, Avg Process/sec, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.

The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.

In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.

When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.

The first two rows correspond to the data that is used to calculate the averages shown on the summary page.

For Input Rate, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the GlueContext.forEachBatch() API, this is set using the option windowSize).

Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the Batch Duration (the last line graph) stabilizes.

In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).

Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.

Process Rate is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.

This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.

Later in this post, we show how auto scaling can help prevent this situation.

Input Rows shows the number of messages read for each batch, like input rate, but using volume instead of rate.

It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the GlueContext.forEachBatch() API.

The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.
The important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).

In Operation Duration, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.

Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.

The following query statistics show another example.

In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.

Finally, in Operation Duration, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).

With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.

Monitor individual batch processing

On the Jobs tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.

For instance, the following screenshot shows the batches on the Jobs tab of the Spark UI of our streaming job.

Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).

The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).

However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by spark.task.maxFailures).

If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using GlueContext.forEachBatch(), it has a number of retries as per the batchMaxRetries parameter (three by default).

These failures are important because they have two effects:

  • They can silently cause delays in the batch processing, depending on how long it took to fail and retry.
  • They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is Amazon DynamoDB, JDBC, Amazon OpenSearch Service, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).

Choosing the description link takes you to the Stages tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?

Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.

Sizing and scaling

Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.

In the case of AWS Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.

For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.

The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.

It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).

To do so, specify a limit when defining the source stream DataFrame:

  • For Kinesis, specify the limit using kinesis.executor.maxFetchRecordsPerShard, and revise this number if the number of shards changes substantially. You might need to increase kinesis.executor.maxFetchTimeInMs as well, in order to allow more time to read the batch and make sure it’s not truncated.
  • For Kafka, set maxOffsetsPerTrigger, which divides that allowance equally between the number of partitions.

The following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Initial benchmark

If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.

For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.
Leave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.

Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).

In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.

Then pick your reference initial settings based on the requirements:

  • If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.
  • In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.

Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).

With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.

AWS Glue Version Worker Type vCores Spark Cores per Worker
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.

Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.

Sizing the cluster when you control the input message system

This is the ideal case because you can optimize the performance and the efficiency as needed.

With the benchmark information you just gathered, you can define your initial AWS Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.

For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an AWS Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.

Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.

Sizing the cluster when you don’t control the message system configuration

In this case, your options for tuning are much more limited.

Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.

If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside AWS Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.

Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.

To solve this scenario, we can provision an AWS Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.

The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.

When the messages have processing interdependencies

If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).

Auto scaling

Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.
However, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.

This is where AWS Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).

The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.

The following screenshot is an example of a streaming job with auto scaling enabled.

Splitting workloads

You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.

In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual subscribe or subscribePattern where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, {"topic1": [0,1,2]}). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.

Sizing down

For low volumes of traffic, AWS Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.

For such situations, there are currently a few options:

  • Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns topic and streamName, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.
  • If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.
  • The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.

Settings

In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.

Property Applies to Remarks
maxOffsetsPerTrigger Kafka Limit of messages per batch. Divides the limit evenly among partitions.
kinesis.executor.maxFetchRecordsPerShard Kinesis Limit per each shard, therefore should be revised if the number of shards changes.
kinesis.executor.maxFetchTimeInMs Kinesis When increasing the batch size (either by increasing the batch interval or the previous property), the executor might need more time, allotted by this property.
startingOffsets Kafka Normally you want to read all the data available and therefore use earliest. However, if there is a big backlog, the system might take a long time to catch up and instead use latest to skip the history.
startingposition Kinesis Similar to startingOffsets, in this case the values to use are TRIM_HORIZON to backload and LATEST to start processing from now on.
includeHeaders Kafka Enable this flag if you need to merge and split multiple topics in the same job (see the previous section for details).
kinesis.executor.maxconnections Kinesis When writing to Kinesis, by default it uses a single connection. Increasing this might improve performance.
kinesis.client.avoidEmptyBatches Kinesis It’s best to set it to true to avoid wasting resources (for example, generating empty files) when there is no data (like the Kafka connector does). GlueContext.forEachBatch prevents empty batches by default.

Further optimizations

In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).

If the producer compresses the messages individually, AWS Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to Adding Streaming ETL Jobs in AWS Glue.

If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.

By default, the GlueContext.forEachBatch function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as Amazon S3 files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.

To disable this option, set persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.

When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.

If you need to shuffle, consider enabling the Kryo serializer (if using custom serializable classes you need to register them first to use it).

As in any AWS Glue jobs, avoid using custom udf() if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.

Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.

Conclusion

In this post, you saw how to approach sizing and tuning an AWS Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.

You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

How to secure an enterprise scale ACM Private CA hierarchy for automotive and manufacturing

Post Syndicated from Anthony Pasquariello original https://aws.amazon.com/blogs/security/how-to-secure-an-enterprise-scale-acm-private-ca-hierarchy-for-automotive-and-manufacturing/

In this post, we show how you can use the AWS Certificate Manager Private Certificate Authority (ACM Private CA) to help follow security best practices when you build a CA hierarchy. This blog post walks through certificate authority (CA) lifecycle management topics, including an architecture overview, centralized security, separation of duties, certificate issuance auditing, and certificate sharing by means of templates. These topics provide best practices surrounding your ACM Private CA hierarchy so that you can build the right CA hierarchy for your organization.

With ACM Private CA, you can create private certificate authority hierarchies, including root and subordinate CAs, without the upfront investment and ongoing maintenance costs of operating your own private CA. You can issue certificates for authenticating internal users, computers, applications, services, servers or other devices, and code signing.

This post includes the following Amazon Web Services (AWS) services:

Solution overview

In this blog post, you’ll see an example automotive manufacturing company and their supplier companies. Each will have associated AWS accounts, which we will call Manufacturer Account(s) and Supplier Account(s), respectively.

Automotive manufacturing companies usually have modules that come from different suppliers. Modules, in the automotive context, are embedded systems that control electrical systems in the vehicle. These modules might be interconnected throughout the in-vehicle network or provide connectivity external to the vehicle, for example, for navigation or sending telemetry to off-board systems.

The architecture needs to allow the Manufacturer to retain control of their CA hierarchy, while giving their external Suppliers limited access to sign the certificates on these modules with the Manufacturer’s CA hierarchy. The architecture we provide here gives you the basic information you need to cover the following objectives:

  1. Creation of accounts that logically separate CAs in a hierarchy
  2. IAM role creation for specific personas to manage the CA lifecycle
  3. Auditing the CA hierarchy by using audit reports
  4. Cross-account sharing by using AWS RAM with certificate template scoping

Architecture overview

Figure 1 shows the solution architecture.

Figure 1: Multi-account certificate authority hierarchy using ACM Private CA

Figure 1: Multi-account certificate authority hierarchy using ACM Private CA

The Manufacturer has two categories of AWS accounts:

  1. A dedicated account to hold the Manufacturer’s root CA
  2. An account to hold their subordinate CA

Note: The diagram shows two subordinate CAs in the Manufacturer account. However, depending on your security needs, you can have a subordinate CA per account per supplier.

Additionally, each Supplier has one AWS account. These accounts will have the Manufacturer’s subordinate CA shared by using AWS RAM. The Manufacturer will have a subordinate CA for each Supplier.

Logically separate accounts

In order to minimize the scope of impact and scope users to actions within their duties, it’s critical that you logically separate AWS accounts based on workload within the CA hierarchy. The following section shows a recommendation for how to do that.

AWS account that holds the root CA

You, the Manufacturer, should place the ACM Private root CA within its own dedicated AWS account to segment and tightly control access to the root CA. This limits access at the account level and only uses the dedicated account for a single purpose: holding the root CA for your organization. This account will only have access from IAM principals that maintain the CA hierarchy through a federation service like AWS Single Sign-On (AWS SSO) or direct federation to IAM through an existing identity provider. This account also has AWS CloudTrail enabled and configured for business-specific alerting, including actions like creation, updating, or deletion of the root CA.

AWS account that holds the subordinate CAs

You, the Manufacturer, will have a dedicated account where the entire CA hierarchy below the root will be located. You should have a separate subordinate CA for each Supplier, and in some cases a separate subordinate CA for each hardware module the Supplier is building. The subordinate CAs can issue certificates for specific hardware modules within the Supplier account.

This Manufacturer account shares each subordinate CA to the respective Supplier’s AWS account by using AWS RAM. This provides joint control to the shared subordinate CA, creating isolation between individual Suppliers. AWS RAM allows Suppliers to control certificate issuance and revocation if this is allowed by the Manufacturer. Each Supplier is only shared certificate provisioning access through AWS RAM configuration, which means that you can tightly monitor and revoke access through AWS RAM. Given this sharing through AWS RAM, the Suppliers don’t have access to modify or delete the CA hierarchy itself and can only provision certificates from it.

Supplier AWS account(s)

These AWS accounts are owned by each respective Supplier. For example, you might partner with radio, navigation system, and telemetry suppliers. Each Supplier would have their own AWS account, which they control. The Supplier accepts an invitation from the manufacturer through AWS RAM, sharing the subordinate CA. The subordinate is allowed to take only certain actions, based on how the Manufacturer configured the share (more on this later in the post).

Separation of duties by means of IAM role creation

In order to follow least privilege best practices when you create a CA hierarchy with ACM Private CA, you must create IAM roles that are specific to each job function. The recommended method is to separate administrator and certificate issuer roles.

For this automotive manufacturing use case, we recommend the following roles:

  1. Manufacturer IAM roles:
    • A CA admin role with CA disable permission
    • A CA admin role with CA delete permission
  2. Supplier certificate issuer IAM role:

Manufacturer IAM role overview

In this flow, one IAM role is able to disable the CA, and a second principal can delete the CA. This enables two-person control for this highly privileged action—meaning that you need a two-person quorum to rotate the CA certificate.

Day-to-day CA admin policy (with CA disable)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "acm-pca:ImportCertificateAuthorityCertificate",
                "acm-pca:DeletePolicy",
                "acm-pca:PutPolicy",
                "acm-pca:TagCertificateAuthority",
                "acm-pca:ListTags",
                "acm-pca:GetCertificate",
                "acm-pca:CreateCertificateAuthority",
                "acm-pca:ListCertificateAuthorities",
                "acm-pca:UntagCertificateAuthority",
                "acm-pca:GetCertificateAuthorityCertificate",
                "acm-pca:RevokeCertificate",
                "acm-pca:UpdateCertificateAuthority",
                "acm-pca:GetPolicy",
                "acm-pca:IssueCertificate",
                "acm-pca:DescribeCertificateAuthorityAuditReport",
                "acm-pca:CreateCertificateAuthorityAuditReport",
                "acm-pca:RestoreCertificateAuthority",
                "acm-pca:GetCertificateAuthorityCsr",
                "acm-pca:DeletePermission",
                "acm-pca:DescribeCertificateAuthority",
                "acm-pca:CreatePermission",
                "acm-pca:ListPermissions"
            ],
            "Resource": “*”
        },
        {
            "Effect": "Deny",
            "Action": [
                "acm-pca:DeleteCertificateAuthority"
            ],
            "Resource": <Enter Root CA ARN Here>
        }
    ]
}

Privileged CA admin policy (with CA delete)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "acm-pca:ImportCertificateAuthorityCertificate",
                "acm-pca:DeletePolicy",
                "acm-pca:PutPolicy",
                "acm-pca:TagCertificateAuthority",
                "acm-pca:ListTags",
                "acm-pca:GetCertificate",
                "acm-pca:UntagCertificateAuthority",
                "acm-pca:GetCertificateAuthorityCertificate",
                "acm-pca:RevokeCertificate",
                "acm-pca:GetPolicy",
    "acm-pca:CreateCertificateAuthority",
                "acm-pca:ListCertificateAuthorities",
                "acm-pca:DescribeCertificateAuthorityAuditReport",
                "acm-pca:CreateCertificateAuthorityAuditReport",
                "acm-pca:RestoreCertificateAuthority",
                "acm-pca:GetCertificateAuthorityCsr",
                "acm-pca:DeletePermission",
    "acm-pca:IssueCertificate",
                "acm-pca:DescribeCertificateAuthority",
                "acm-pca:CreatePermission",
                "acm-pca:ListPermissions",
                "acm-pca:DeleteCertificateAuthority"
            ],
            "Resource": “*”
        },
        {
            "Effect": "Deny",
            "Action": [
                "acm-pca:UpdateCertificateAuthority"
            ],
            "Resource": <Enter Root CA ARN Here>
        }
    ]
}

We recommend that you, the Manufacturer, create a two-person process for highly privileged events like CA certificate rotation ceremonies. The preceding policies serve two purposes. First, they allow you to designate separation of management duties between day-to-day CA admin tasks and infrequent root CA rotation ceremonies. The day-to-day CA admin policy allows all ACM Private CA actions except the ability to delete the root CA. This is because the day-to-day CA admin should not be deleting the root CA. Meanwhile, the privileged CA admin policy has the ability to call DeleteCertificateAuthority. However, in order to call DeleteCertificateAuthority, you first need to have the day-to-day CA admin role disable the root CA.

This means that both roles listed here are necessary to perform a root CA deletion for a rotation or replacement ceremony. This arrangement creates a way to control the deletion of the CA resource by requiring two separate actors to disable and delete. It’s crucial that the two roles are assumed by two different people at the identity provider. Having one person assume both of these roles negates the increased security created by each role.

You might also consider enforcing tagging of CAs at the organization level so that each new CA has relevant tags. The blog post Securing resource tags used for authorization using a service control policy in AWS Organizations illustrates in detail how to secure tags using service control policies (SCPs), so that only authorized users can modify tags.

Supplier IAM role overview

Your Suppliers should also follow least privilege when creating IAM roles within their own accounts. However, as we’ll see in the Cross-account sharing by using AWS RAM section, even if the Suppliers don’t follow best practices, the Manufacturer’s ACM Private CA hierarchy is still isolated and secure.

That being said, here are common IAM roles that your Suppliers should create within their own accounts:

  1. Developers who provision certificates for development and QA workloads
  2. Developers who provision certificates for production

These certificate issuing roles give the Supplier the ability to issue end-entity certificates from the CA hierarchy. In this use case, the Supplier needs two different levels of permissions: non-production certificates and production certificates. To simplify the roles within IAM, the Supplier decided to use ABAC. These ABAC policies allow operations when the principal’s tag matches the resource tag. Because the Supplier has many similar policies, each with a different set of users, they use ABAC to create a single IAM policy that uses principal tags rather than creating multiple slightly different IAM policies.

Certificate issuing policy that uses ABAC

{
	"Version": "2012-10-17",
	"Statement": [
	{
		"Effect": "Allow",
		"Action": [
			"acm-pca:IssueCertificate",
			"acm-pca:ListTags",
			"acm-pca:GetCertificate",
			"acm-pca:ListCertificateAuthorities"
		],
		"Resource": "*",
		"Condition": {
			"StringEquals": {
				"aws:ResourceTag/access-project": "${aws:PrincipalTag/access-project}",
				"aws:ResourceTag/access-team": "${aws:PrincipalTag/access-team}"
			}
		}
	}
}

This single policy enables all personas to be scoped to least privilege access. If you look at the Condition portion of the IAM policy, you can see the power of ABAC. This condition verifies that the PrincipalTag matches the ResourceTag. The Supplier is federating into IAM roles through AWS SSO and tagging the Supplier’s principals within its selected identity providers.

Because you as the Manufacturer have tagged the subordinate CAs that are shared with the Supplier, the Supplier can use identity provider (IdP) attributes as tags to simplify the Supplier’s IAM strategy. In this example, the Supplier configures each relevant user in the IdP with the attribute (tag) key: access-team. This tag matches the tagging strategy used by the Manufacturer. Here’s the mapping for each persona within the use case:

  • Dev environment:
    • access-team: DevTeam
  • Production environment:
    • access-team: ProdTeam

You can choose to add or remove tags depending on your use case, and the preceding scenario serves as a simple example. This offloads the need to create new IAM policies as the number of subordinate CAs grow. If you decide to use ABAC, make sure that you require both principal tagging and resource tagging upon creation of each, because these tags become your authorization mechanism.

CA lifecycle: Audit report published by the Manufacturer

In terms of auditing and monitoring, we recommend that the Manufacturer have a mechanism to track how many certificates were issued for a specific Supplier or module. Within the Manufacturer accounts, you can generate audit reports through the console or CLI. This allows you, the manufacturer, to gather metrics on certificate issuance and revocation. Following is an example of a certificate issuance.

Figure 2: Audit report output for certificate issuance

Figure 2: Audit report output for certificate issuance

For more information on generating an audit report, see Using audit reports with your private CA.

Cross-account sharing by using AWS RAM

With AWS RAM, you can share CAs with another account. We recommend that you, as a Manufacturer, use AWS RAM to share CAs with Suppliers so that they can issue certificates without administrator access to the CA. This arrangement allows you as the Manufacturer to more easily limit and revoke access if you change Suppliers. The Suppliers can create certificates through the ACM console or through the CLI, API, or AWS CloudFormation. Manufacturers are only sharing the ability to create, manage, bind, and export certificates from the CA hierarchy. The CA hierarchy itself is contained within the Manufacturers’ accounts, and not within the Suppliers’ accounts. By using AWS RAM, the Suppliers don’t have any administrator access to the CA hierarchy. From a cost perspective, you can centrally control and monitor the costs of your private CA hierarchy without having to deal with cost-sharing across Suppliers.

Refer to How to use AWS RAM to share your ACM Private CA cross-account for a full walkthrough on how to use RAM with ACM Private CA.

Certificate templates with AWS RAM managed permissions

AWS RAM has the ability to create managed permissions in order to define the actions that can be performed on shared resources. For each shareable resource type, you can use AWS RAM managed permissions to define which permissions to grant to whom for shared resource types that support additional managed permissions. This means that when you use AWS RAM to share a resource (in this case ACM Private CA), you can now specify which IAM actions can take place on that resource. AWS RAM managed permissions integrate with the following ACM Private CA certificate templates:

  • Permission 1: BlankEndEntityCertificate_APICSRPassthrough
  • Permission 2: EndEntityClientAuthCertificate
  • Permission 3: EndEntityServerAuthCertificate
  • Permission 4: subordinatesubordinateCACertificate_PathLen0
  • Permission 5: RevokeCertificate

These five certificate templates allow a Manufacturer to scope its Suppliers to the certificate template provisioning level. This means that you can limit which certificate templates can be issued by the Suppliers.

Let’s assume you have a Supplier that is supplying a module that has infotainment media capability, and you, the manufacturer, want the Supplier to provision the end-entity client certificate but you don’t want them to be able to revoke that certificate. You can use AWS RAM managed permissions to scope that Supplier’s shared private CA to allow the EndEntityClientAuthCertificate issuance template, which implicitly denies RevokeCertificate template actions. This further scopes down what the Supplier is authorized to issue on the shared CA, gives the responsibility for revoking infotainment device certificates to the Manufacturer, but still allows the Supplier to load devices with a certificate upon creation.

Example of creating a resource share in AWS RAM by using the AWS CLI

This walkthrough shows you the general process of sharing a private CA by using AWS RAM and then accepting that shared resource in the partner account.

  1. Create your shared resource in AWS RAM from the Manufacturer subordinate CA account. Notice that in the example that follows, we selected one of the certificate templates within the managed permissions option. This limits the shared CA so that it can only issue certain types of certificate templates.

    Note: Replace the <variable> placeholders with your own values.

    aws ram create-resource-share
    		--name Shared_Private_CA
    		--resource-arns arn:aws:acm-pca:<region:111122223333>:certificate-authority/<xxxx-xxxx-xxxx-xxxx-example>
    		--permission-arns "arn:aws:ram::aws:permission/<AWSRAMBlankEndEntityCertificateAPICSRPassthroughIssuanceCertificateAuthority>"
    		--principals <444455556666>

  2. From the Supplier account, the Supplier administrator will accept the resource. Follow How to use AWS RAM to share your ACM Private CA cross-account to complete the shared resource acceptance and issue an end entity certificate.

Conclusion

In this blog post, you learned about the various considerations for building a secure public key infrastructure (PKI) hierarchy by using ACM Private CA through an example customer’s prescriptive setup. You learned how you can use AWS RAM to share CAs across accounts easily and securely. You also learned about sharing specific CAs through the ability to define permissions to specific principals across accounts, allowing for granular control of permissions on principals that might act on those resources.

The main takeaways of this post are how to create least privileged roles within IAM in order to scope down the activities of each persona and limit the potential scope of impact for your organization’s private CA hierarchy. Although these best practices are specific to manufacturer business requirements, you can alter them based on your business needs. With the managed permissions in AWS RAM, you can further scope down the actions that principals can perform with your CA by limiting the certificate templates allowed on that CA when you share it. Using all of these tools, you can help your PKI hierarchy to have a high level of security. To learn more, see the other ACM Private CA posts on the AWS Security Blog.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Anthony Pasquariello

Anthony Pasquariello

Anthony is a Senior Solutions Architect at AWS based in New York City. He specializes in modernization and security for our advanced enterprise customers. Anthony enjoys writing and speaking about all things cloud. He’s pursuing an MBA, and received his MS and BS in Electrical & Computer Engineering.

Omar Zoma

Omar Zoma

Omar is a senior AWS Security Solutions Architect that lives in metro Detroit. Omar is passionate about helping customers solve cloud and vehicle security problems at a global scale. In his free time, Omar trains hundreds of students a year in security and cloud through universities and training programs.

Use Amazon Cognito to add claims to an identity token for fine-grained authorization

Post Syndicated from Ajit Ambike original https://aws.amazon.com/blogs/security/use-amazon-cognito-to-add-claims-to-an-identity-token-for-fine-grained-authorization/

With Amazon Cognito, you can quickly add user sign-up, sign-in, and access control to your web and mobile applications. After a user signs in successfully, Cognito generates an identity token for user authorization. The service provides a pre token generation trigger, which you can use to customize identity token claims before token generation. In this blog post, we’ll demonstrate how to perform fine-grained authorization, which provides additional details about an authenticated user by using claims that are added to the identity token. The solution uses a pre token generation trigger to add these claims to the identity token.

Scenario

Imagine a web application that is used by a construction company, where engineers log in to review information related to multiple projects. We’ll look at two different ways of designing the architecture for this scenario: a standard design and a more optimized design.

Standard architecture

A sample standard architecture for such an application is shown in Figure 1, with labels for the various workflow steps:

  1. The user interface is implemented by using ReactJS (a JavaScript library for building user interfaces).
  2. The user pool is configured in Amazon Cognito.
  3. The back end is implemented by using Amazon API Gateway.
  4. AWS Lambda functions exist to implement business logic.
  5. The AWS Lambda CheckUserAccess function (5) checks whether the user has authorization to call the AWS Lambda functions (4).
  6. The project information is stored in an Amazon DynamoDB database.
Figure 1: Lambda functions that need the user’s projectID call the GetProjectID Lambda function

Figure 1: Lambda functions that need the user’s projectID call the GetProjectID Lambda function

In this scenario, because the user has access to information from several projects, several backend functions use calls to the CheckUserAccess Lambda function (step 5 in Figure 1) in order to serve the information that was requested. This will result in multiple calls to the function for the same user, which introduces latency into the system.

Optimized architecture

This blog post introduces a new optimized design, shown in Figure 2, which substantially reduces calls to the CheckUserAccess API endpoint:

  1. The user logs in.
  2. Amazon Cognito makes a single call to the PretokenGenerationLambdaFunction-pretokenCognito function.
  3. The PretokenGenerationLambdaFunction-pretokenCognito function queries the Project ID from the DynamoDB table and adds that information to the Identity token.
  4. DynamoDB delivers the query result to the PretokenGenerationLambdaFunction-pretokenCognito function.
  5. This Identity token is passed in the authorization header for making calls to the Amazon API Gateway endpoint.
  6. Information in the identity token claims is used by the Lambda functions that contain business logic, for additional fine-grained authorization. Therefore, the CheckUserAccess function (7) need not be called.

The improved architecture is shown in Figure 2.

Figure 2. Get the projectID and inset it in a custom claim in the Identity token

Figure 2. Get the projectID and inset it in a custom claim in the Identity token

The benefits of this approach are:

  1. The number of calls to get the Project ID from the DynamoDB table are reduced, which in turn reduces overall latency.
  2. The dependency on the CheckUserAccess Lambda function is removed from the business logic. This reduces coupling in the architecture, as depicted in the diagram.

In the code sample provided in this post, the user interface is run locally from the user’s computer, for simplicity.

Code sample

You can download a zip file that contains the code and the AWS CloudFormation template to implement this solution. The code that we provide to illustrate this solution is described in the following sections.

Prerequisites

Before you deploy this solution, you must first do the following:

  1. Download and install Python 3.7 or later.
  2. Download the AWS SDK for Python (Boto3) library by using the following pip command.
    pip install boto3
  3. Install the argparse package by using the following pip command.
    pip install argparse
  4. Install the AWS Command Line Interface (AWS CLI).
  5. Configure the AWS CLI.
  6. Download a code editor for Python. We used Visual Studio Code for this post.
  7. Install Node.js.

Description of infrastructure

The code provided with this post installs the following infrastructure in your AWS account.

Resource Description
Amazon Cognito user pool The users, added by the addUserInfo.py script, are added to this pool. The client ID is used to identify the web client that will connect to the user pool. The user pool domain is used by the web client to request authentication of the user.
Required AWS Identity and Access Management (IAM) roles and policies Policies used for running the Lambda function and connecting to the DynamoDB database.
Lambda function for the pre token generation trigger A Lambda function to add custom claims to the Identity token.
DynamoDB table with user information A sample database to store user information that is specific to the application.

Deploy the solution

In this section, we describe how to deploy the infrastructure, save the trigger configuration, add users to the Cognito user pool, and run the web application.

To deploy the solution infrastructure

  1. Download the zip file to your machine. The readme.md file in the addclaimstoidtoken folder includes a table that describes the key files in the code.
  2. Change the directory to addclaimstoidtoken.
    cd addclaimstoidtoken
  3. Review stackInputs.json. Change the value of the userPoolDomainName parameter to a random unique value of your choice. This example uses pretokendomainname as the Amazon Cognito domain name; you should change it to a unique domain name of your choice.
  4. Deploy the infrastructure by running the following Python script.
    python3 setup_pretoken.py

    After the CloudFormation stack creation is complete, you should see the details of the infrastructure created as depicted in Figure 3.

    Figure 3: Details of infrastructure

    Figure 3: Details of infrastructure

Now you’re ready to add users to your Amazon Cognito user pool.

To add users to your Cognito user pool

  1. To add users to the Cognito user pool and configure the DynamoDB store, run the Python script from the addclaimstoidtoken directory.
    python3 add_user_info.py
  2. This script adds one user. It will prompt you to provide a username, email, and password for the user.

    Note: Because this is sample code, advanced features of Cognito, like multi-factor authentication, are not enabled. We recommend enabling these features for a production application.

    The addUserInfo.py script performs two actions:

    • Adds the user to the Cognito user pool.
      Figure 4: User added to the Cognito user pool

      Figure 4: User added to the Cognito user pool

    • Adds sample data to the DynamoDB table.
      Figure 5: Sample data added to the DynamoDB table named UserInfoTable

      Figure 5: Sample data added to the DynamoDB table named UserInfoTable

Now you’re ready to run the application to verify the custom claim addition.

To run the web application

  1. Change the directory to the pre-token-web-app directory and run the following command.
    cd pre-token-web-app
  2. This directory contains a ReactJS web application that displays details of the identity token. On the terminal, run the following commands to run the ReactJS application.
    npm install
    npm start

    This should open http://localhost:8081 in your default browser window that shows the Login button.

    Figure 6: Browser opens to URL http://localhost:8081

    Figure 6: Browser opens to URL http://localhost:8081

  3. Choose the Login button. After you do so, the Cognito-hosted login screen is displayed. Log in to the website with the user identity you created by using the addUserInfo.py script in step 1 of the To add users to your Cognito user pool procedure.
    Figure 7: Input credentials in the Cognito-hosted login screen

    Figure 7: Input credentials in the Cognito-hosted login screen

  4. When the login is successful, the next screen displays the identity and access tokens in the URL. You can reveal the token details to verify that the custom claim has been added to the token by choosing the Show Token Detail button.
    Figure 8: Token details displayed in the browser

    Figure 8: Token details displayed in the browser

What happened behind the scenes?

In this web application, the following steps happened behind the scenes:

  1. When you ran the npm start command on the terminal command line, that ran the react-scripts start command from package.json. The port number (8081) was configured in the pre-token-web-app/.env file. This opened the web application that was defined in app.js in the default browser at the URL http://localhost:8081.
  2. The Login button is configured to navigate to the URL that was defined in the constants.js file. The constants.js file was generated during the running of the setup_pretoken.py script. This URL points to the Cognito-hosted default login user interface.
  3. When you provided the login information (username and password), Amazon Cognito authenticated the user. Before generating the set of tokens (identity token and access token), Cognito first called the pre-token-generation Lambda trigger. This Lambda function has the code to connect to the DynamoDB database. The Lambda function can then access the project information for the user that is stored in the userInfo table. The Lambda function read this project information and added it to the identity token that was delivered to the web application.

    Lambda function code

    const AWS = require("aws-sdk");
    
    // Create the DynamoDB service object
    var ddb = new AWS.DynamoDB({ apiVersion: "2012-08-10" });
    
    // PretokenGeneration Lambda
    exports.handler = async function (event, context) {
        var eventUserName = "";
        var projects = "";
    
        if (!event.userName) {
            return event;
        }
    
        var params = {
            ExpressionAttributeValues: {
                ":v1": {
                    S: event.userName
                }
            },
            KeyConditionExpression: "userName = :v1",
            ProjectionExpression: "projects",
            TableName: "UserInfoTable"
        };
    
        event.response = {
            "claimsOverrideDetails": {
                "claimsToAddOrOverride": {
                    "userName": event.userName,
                    "projects": null
                },
            }
        };
    
        try {
            let result = await ddb.query(params).promise();
            if (result.Items.length > 0) {
                const projects = result.Items[0]["projects"]["S"];
                console.log("projects = " + projects);
                event.response.claimsOverrideDetails.claimsToAddOrOverride.projects = projects;
            }
        }
        catch (error) {
            console.log(error);
        }
    
        return event;
    };

    The code for the Lambda function is as follows.

  4. After a successful login, Amazon Cognito redirected to the URL that was specified in the App Client Settings section, and added the token to the URL.
  5. The webpage detected the token in the URL and displayed the Show Token Detail button. When you selected the button, the webpage read the token in the URL, decoded the token, and displayed the information in the relevant text boxes.
  6. Notice that the Decoded ID Token box shows the custom claim named projects that displays the projectID that was added by the PretokenGenerationLambdaFunction-pretokenCognito trigger.

How to use the sample code in your application

We recommend that you use this sample code with the following modifications:

  1. The code provided does not implement the API Gateway and Lambda functions that consume the custom claim information. You should implement the necessary Lambda functions and read the custom claim for the event object. This event object is a JSON-formatted object that contains authorization data.
  2. The ReactJS-based user interface should be hosted on an Amazon Simple Storage Service (Amazon S3) bucket.
  3. The projectId of the user is available in the token. Therefore, when the token is passed by the Authorization trigger to the back end, this custom claim information can be used to perform actions specific to the project for that user. For example, getting all of that user’s work items that are related to the project.
  4. Because the token is valid for one hour, the information in the custom claim information is available to the user interface during that time.
  5. You can use the AWS Amplify library to simplify the communication between your web application and Amazon Cognito. AWS Amplify can handle the token retention and refresh token mechanism for the web application. This also removes the need for the token to be displayed in the URL.
  6. If you’re using Amazon Cognito to manage your users and authenticate them, using the Amazon Cognito user pool to control access to your API is easier, because you don’t have to write the authentication code in your authorizer.
  7. If you decide to use Lambda authorizers, note the following important information from the topic Steps to create an API Gateway Lambda authorizer: “In production code, you may need to authenticate the user before granting authorization. If so, you can add authentication logic in the Lambda function as well by calling an authentication provider as directed in the documentation for that provider.”
  8. Lambda authorizer is recommended if the final authorization (not just token validity) decision is made based on custom claims.

Conclusion

In this blog post, we demonstrated how to implement fine-grained authorization based on data stored in the back end, by using claims stored in an identity token that is generated by the Amazon Cognito pre token generation trigger. This solution can help you achieve a reduction in latency and improvement in performance.

For more information on the pre token generation Lambda trigger, refer to the Amazon Cognito Developer Guide.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Ajit Ambike

Ajit Ambike

Ajit Ambike is a Sr. Application Architect at Amazon Web Services. As part of AWS Energy team, he leads the creation of new business capabilities for the customers. Ajit also brings best practices to the customers and partners that accelerate the productivity of their teams.

Zafar Kapadia

Zafar Kapadia

Zafar Kapadia is a Sr. Customer Delivery Architect at AWS. He has over 17 years of IT experience and has worked on several Application Development and Optimization projects. He is also an avid cricketer and plays in various local leagues.