Tag Archives: Advanced (300)

Run Apache Hive workloads using Spark SQL with Amazon EMR on EKS

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/run-apache-hive-workloads-using-spark-sql-with-amazon-emr-on-eks/

Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. Using Spark SQL to run Hive workloads provides not only the simplicity of SQL-like queries but also taps into the exceptional speed and performance provided by Spark. Spark SQL is an Apache Spark module for structured data processing. One of its most popular use cases is to read and write Hive tables with connectivity to a persistent Hive metastore, supporting Hive SerDes and user-defined functions.

Starting from version 1.2.0, Apache Spark has supported queries written in HiveQL. HiveQL is a SQL-like language that produces data queries containing business logic, which can be converted to Spark jobs. However, this feature is only supported by YARN or standalone Spark mode. To run HiveQL-based data workloads with Spark on Kubernetes mode, engineers must embed their SQL queries into programmatic code such as PySpark, which requires additional effort to manually change code.

Amazon EMR on Amazon EKS provides a deployment option for Amazon EMR that you can use to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS).

Amazon EMR on EKS release 6.7.0 and later include the ability to run SparkSQL through the StartJobRun API. As a result of this enhancement, customers will now be able to supply SQL entry-point files and run HiveQL queries as Spark jobs on EMR on EKS directly. The feature is available in all AWS Regions where EMR on EKS is available.

Use case

FINRA is one of the largest Amazon EMR customers that is running SQL-based workloads using the Hive on Spark approach. FINRA, Financial Industry Regulatory Authority, is a private sector regulator responsible for analyzing equities and option trading activity in the US. To look for fraud, market manipulation, insider trading, and abuse, FINRA’s technology group has developed a robust set of big data tools in the AWS Cloud to support these activities.

FINRA centralizes all its data in Amazon Simple Storage Service (Amazon S3) with a remote Hive metastore on Amazon Relational Database Service (Amazon RDS) to manage their metadata information. They use various AWS analytics services, such as Amazon EMR, to enable their analysts and data scientists to apply advanced analytics techniques to interactively develop and test new surveillance patterns and improve investor protection. To make these interactions more efficient and productive, FINRA modernized their hive workloads in Amazon EMR from its legacy Hive on MapReduce to Hive on Spark, which resulted in query performance gains between 50 and 80 percent.

Going forward, FINRA wants to further innovate the interactive big data platform by moving from a monolithic design pattern to a job-centric paradigm, so that it can fulfill future capacity requirements as its business grows. The capability of running Hive workloads using SparkSQL directly with EMR on EKS is one of the key enablers that helps FINRA continuously pursue that goal.

Additionally, EMR on EKS offers the following benefits to accelerate adoption:

  • Fine-grained access controls (IRSA) that are job-centric to harden customers’ security posture
  • Minimized adoption effort as it enables direct Hive query submission as a Spark job without code changes
  • Reduced run costs by consolidating multiple software versions for Hive or Spark, unifying artificial intelligence and machine learning (AI/ML) and exchange, transform, and load (ETL) pipelines into a single environment
  • Simplified cluster management through multi-Availability Zone support and highly responsive autoscaling and provisioning
  • Reduced operational overhead by hosting multiple compute and storage types or CPU architectures (x86 & Arm64) in a single configuration
  • Increased application reusability and portability supported by custom docker images, which allows them to encapsulate all necessary dependencies

Running Hive SQL queries on EMR on EKS

Prerequisites

Make sure that you have AWS Command Line Interface (AWS CLI) version 1.25.70 or later installed. If you’re running AWS CLI version 2, you need version 2.7.31 or later. Use the following command to check your AWS CLI version:

aws --version

If necessary, install or update the latest version of the AWS CLI.

Solution Overview

To get started, let’s look at the following diagram. It illustrates a high-level architectural design and different services that can be used in the Hive workload. To match with FINRA’s use case, we chose an Amazon RDS database as the remote Hive metastore. Alternatively, you can use AWS Glue Data Catalog as the metastore for Hive if needed. For more details, see the aws-sample github project.

The minimum required infrastructure is:

  • An S3 bucket to store a Hive SQL script file
  • An Amazon EKS cluster with EMR on EKS enabled
  • An Amazon RDS for MySQL database in the same virtual private cloud (VPC) as the Amazon EKS cluster
  • A standalone Hive metastore service (HMS) running on the EKS cluster or a small Amazon EMR on EC2 cluster with the Hive application installed

To have a quick start, run the sample CloudFormation deployment. The infrastructure deployment includes the following resources:

Create a Hive script file

Store a few lines of Hive queries in a single file, then upload the file to your S3 bucket, which can be found in your AWS Management Console in the AWS CloudFormation Outputs tab. Search for the key value of CODEBUCKET as shown in preceding screenshot. For a quick start, you can skip this step and use the sample file stored in s3://<YOUR_S3BUCKET>/app_code/job/set-of-hive-queries.sql. The following is a code snippet from the sample file :

-- drop database in case switch between different hive metastore

DROP DATABASE IF EXISTS hiveonspark CASCADE;
CREATE DATABASE hiveonspark;
USE hiveonspark;

--create hive managed table
DROP TABLE IF EXISTS testtable purge;
CREATE TABLE IF NOT EXISTS testtable (`key` INT, `value` STRING) using hive;
LOAD DATA LOCAL INPATH '/usr/lib/spark/examples/src/main/resources/kv1.txt' INTO TABLE testtable;
SELECT * FROM testtable WHERE key=238;

-- test1: add column
ALTER TABLE testtable ADD COLUMNS (`arrayCol` Array<int>);
-- test2: insert
INSERT INTO testtable VALUES 
(238,'val_238',array(1,3)),
(238,'val_238',array(2,3));
SELECT * FROM testtable WHERE key=238;
-- test3: UDF
CREATE TEMPORARY FUNCTION hiveUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode';
SELECT `key`,`value`,hiveUDF(arrayCol) FROM testtable WHERE key=238;
-- test4: CTAS table with parameter
DROP TABLE IF EXISTS ctas_testtable purge;
CREATE TABLE ctas_testtable 
STORED AS ORC
AS
SELECT * FROM testtable;
SELECT * FROM ctas_testtable WHERE key=${key_ID};
-- test5: External table mapped to S3
CREATE EXTERNAL TABLE IF NOT EXISTS amazonreview
( 
  marketplace string, 
  customer_id string, 
  review_id  string, 
  product_id  string, 
  product_parent  string, 
  product_title  string, 
  star_rating  integer, 
  helpful_votes  integer, 
  total_votes  integer, 
  vine  string, 
  verified_purchase  string, 
  review_headline  string, 
  review_body  string, 
  review_date  date, 
  year  integer
) 
STORED AS PARQUET 
LOCATION 's3://${S3Bucket}/app_code/data/toy/';
SELECT count(*) FROM amazonreview;

Submit the Hive script to EMR on EKS

First, set up the required environment variables. See the shell script post-deployment.sh:

stack_name='HiveEMRonEKS'
export VIRTUAL_CLUSTER_ID=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='VirtualClusterId'].OutputValue" --output text)
export EMR_ROLE_ARN=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='EMRExecRoleARN'].OutputValue" --output text)
export S3BUCKET=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='CODEBUCKET'].OutputValue" --output text)

Connect to the demo EKS cluster:

echo `aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?starts_with(OutputKey,'eksclusterEKSConfig')].OutputValue" --output text` | bash
kubectl get svc

Ensure the entryPoint path is correct, then submit the set-of-hive-queries.sql to EMR on EKS.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name sparksql-test \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.8.0-latest \
--job-driver '{
  "sparkSqlJobDriver": {
      "entryPoint": "s3://'$S3BUCKET'/app_code/job/set-of-hive-queries.sql",
      "sparkSqlParameters": "-hivevar S3Bucket='$S3BUCKET' -hivevar Key_ID=238"}}' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.sql.warehouse.dir": "s3://'$S3BUCKET'/warehouse/",
          "spark.hive.metastore.uris": "thrift://hive-metastore:9083"
        }
      }
    ], 
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {"logUri": "s3://'$S3BUCKET'/elasticmapreduce/emr-containers"}}}'

Note that the shell script referenced the set-of-hive-queries.sql Hive script file as an entry point script. It uses the sparkSqlJobDriver attribute, not the usual sparkSubmitJobDriver designed for Spark applications. In the sparkSqlParameters section, we pass in two environment variables S3Bucket and key_ID to the Hive script.

The property "spark.hive.metastore.uris": "thrift://hive-metastore:9083" sets a connection to a Hive Metastore Service (HMS) called hive-metastore, which is running as a Kubernetes service on the demo EKS cluster as shown in the follow screenshot. If you’re running the thrift service on Amazon EMR on EC2, the URI should be thrift://<YOUR_EMR_MASTER_NODE_DNS_NAME>:9083. If you chose AWS Glue Data Catalog as your Hive metastore, replace the entire property with "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory".

Finally, check the job status using the kubectl command line tool: kubectl get po -n emr --watch

Expected output

  1. Go to the Amazon EMR console.
  2. Navigate to the side menu Virtual clusters, then select the HiveDemo cluster, You can see an entry for the SparkSQL test job.
  3. Click Spark UI hyperlink to monitor each query’s duration and status on a web interface.
  4. To query the Amazon RDS based Hive metastore, you need a MYSQL client tool installed. To make it easier, the sample CloudFormation template has installed the query tool on master node of a small Amazon EMR on EC2 cluster.
  5. Find the EMR master node by running the following command:
aws ec2 describe-instances --filter Name=tag:project,Values=$stack_name Name=tag:aws:elasticmapreduce:instance-group-role,Values=MASTER --query 'Reservations[].Instances[].InstanceId[]'

  1. Go to the Amazon EC2 console and connect to the master node through the Session Manager.
  2. Before querying the MySQL RDS database (the Hive metastore), run the following commands on your local machine to get the credentials:
    stack_name='HiveEMRonEKS' 
    export secret_name=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='HiveSecretName'].OutputValue" --output text) 
    export HOST_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.host')
    export PASSWORD=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.password')
    export DB_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.dbname')
    export USER_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.username')
    echo -e "\n host: $HOST_NAME\n DB: $DB_NAME\n passowrd: $PASSWORD\n username: $USER_NAME\n"
    

  3. After connected through Session Manager, query the Hive metastore from your Amazon EMR master node.
    mysql -u admin -P 3306 -p -h <YOUR_HOST_NAME>
    Enter password:<YOUR_PASSWORD>
    
    # Query the metastore
    MySQL[(none)]> Use HiveEMRonEKS;
    MySQL[HiveEMRonEKS]> select * from DBS;
    MySQL[HiveEMRonEKS]> select * from TBLS;
    MySQL[HiveEMRonEKS]> exit();

  4. Validate the Hive tables (created by set-of-hive-queries.sql) through the interactive Hive CLI tool.

Note:-Your query environment must have the Hive Client tool installed and a connection to your Hive metastore or AWS Glue Data Catalog. For the testing purpose, you can connect to the same Amazon EMR on EC2 master node and query your Hive tables. The EMR cluster has been pre-configured with the required setups.

sudo su
hive
hive> show databases;

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script.

curl https://raw.githubusercontent.com/aws-samples/hive-emr-on-eks/main/deployment/app_code/delete_all.sh | bash

Go to the CloudFormation console and manually delete the remaining resources if needed.

Conclusion

Amazon EMR on EKS releases 6.7.0 and higher include a Spark SQL job driver so that you can directly run Spark SQL scripts via the StartJobRun API. Without any modifications to your existing Hive scripts, you can directly execute them as a SparkSQL job on Amazon EMR on EKS.

FINRA is one of the largest Amazon EMR customers. It runs over 400 Hive clusters for its analysts who need to interactively query multi-petabyte data sets. Modernizing its Hive workloads with SparkSQL gives FINRA a 50 to 80 percent query performance improvement. The support to run Spark SQL through the StartJobRun API in EMR on EKS has further enabled FINRA’s innovation in data analytics.

In this post, we demonstrated how to submit a Hive script to Amazon EMR on EKS and run it as a SparkSQL job. We encourage you to give it a try and are keen to hear your feedback and suggestions.


About the authors

Amit Maindola is a Senior Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Unleash the power of Snapshot Management to take automated snapshots using Amazon OpenSearch Service

Post Syndicated from Prashant Agrawal original https://aws.amazon.com/blogs/big-data/unleash-the-power-of-snapshot-management-to-take-automated-snapshots-using-amazon-opensearch-service/

Data is the lifeblood of any organization, and the importance of protecting it cannot be overstated. Starting with OpenSearch v2.5 in Amazon OpenSearch Service, we introduced Snapshot Management, which automates the process of taking snapshots of your domain. Snapshot Management helps you create point-in-time backups of your domain using OpenSearch Dashboards, including both data and configuration settings (for visualizations and dashboards). You can use these snapshots to restore your cluster to a specific state, recover from potential failures, and even clone environments for testing or development purposes.

Before this release, to automate the process of taking snapshots, you needed to use the snapshot action of OpenSearch’s Index State Management (ISM) feature. With ISM, you could only back up a particular index. Automating backup for multiple indexes required you to write custom scripts or use external management tools. With Snapshot Management, you can automate snapshotting across multiple indexes to safeguard your data and ensure its durability and recoverability.

In this post, we share how to use Snapshot Management to take automated snapshots using OpenSearch Service.

Solution overview

We demonstrate the following high-level steps:

  1. Register a snapshot repository in OpenSearch Service (a one-time process).
  2. Configure a sample ISM policy to migrate the indexes from hot storage to the UltraWarm storage tier after the indexes meet a specific condition.
  3. Create a Snapshot Management policy to take an automated snapshot for all indexes present across different storage tiers within a domain.

As of this writing, Snapshot Management doesn’t support single snapshot creation for all indexes present across different storage tiers within OpenSearch Service. For example, if you try to create a snapshot on multiple indexes with * and some indexes are in the warm tier, the snapshot creation will fail.

To overcome this limitation, you can use index aliases, with one index alias for each type of storage tier. For example, every new index created in the cluster will belong to the hot alias. When the index is moved to the UltraWarm tier via ISM, the alias for the index will be modified to warm, and the index will be removed from the hot alias.

Register a manual snapshot repository

To register a manual snapshot repository, you must create and configure an Amazon Simple Storage Service (Amazon S3) bucket and AWS Identity and Access Management (IAM) roles. For more information, refer to Prerequisites. Complete the following steps:

  1. Create an S3 bucket to store snapshots for your OpenSearch Service domain.
  2. Create an IAM role called SnapshotRole with the following IAM policy to delegate permissions to OpenSearch Service (provide the name of your S3 bucket):
{
    "Version": "2012-10-17",
    "Statement": [{
        "Action": ["s3:ListBucket"],
        "Effect": "Allow",
        "Resource": ["arn:aws:s3:::<s3-bucket-name>"]
    }, {
        "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
        "Effect": "Allow",
        "Resource": ["arn:aws:s3:::<s3-bucket-name>/*"]
    }]
}
  1. Set the trust relationship for SnapshotRole as follows:
{
    "Version": "2012-10-17",
    "Statement": [{
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
            "Service": "es.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
    }]
}
  1. Create a new IAM role called RegisterSnapshotRepo, which delegates iam:PassRole and es:ESHttpPut (provide your AWS account and domain name):
{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": "iam:PassRole",
        "Resource": "arn:aws:iam::<aws account id>:role/SnapshotRole"
    }, {
        "Effect": "Allow",
        "Action": "es:ESHttpPut",
        "Resource": "arn:aws:es:region:<aws account id>:domain/<domain-name>/*"
    }]
}
  1. If you have enabled fine-grained access control for your domain, map the snapshot role manage_snapshots to your RegisterSnapshotRepo IAM role in OpenSearch Service.
  2. Now you can use Python code like the following example to register the S3 bucket you created as a snapshot repository for your domain. Provide your host name, Region, snapshot repo name, and S3 bucket. Replace "arn:aws:iam::123456789012:role/SnapshotRole" with the ARN of your SnapshotRole. The Boto3 session should use the RegisterSnapshotRepo IAM role.
import boto3
import requests
from requests_aws4auth import AWS4Auth
 
host = '<host>' # domain endpoint with trailing /
region = '<region>' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
 
# To Register the repository

path = '_snapshot/<snapshot-repo-name>'
url = host + path
 
payload = {
  "type": "s3",
  "settings": {
    "bucket": "<s3-bucket-name>",
    "region": "<region>",
    "role_arn": "arn:aws:iam::123456789012:role/SnapshotRole"
  }
}
 
headers = {"Content-Type": "application/json"}
 
r = requests.put(url, auth=awsauth, json=payload, headers=headers, timeout=300)
 
print(r.status_code)
print(r.text)

The S3 bucket used as a repository must be in the same Region as your domain. You can run the preceding code in any compute instance that has connectivity to your OpenSearch Service domain, such as Amazon Elastic Compute Cloud (Amazon EC2), AWS Cloud9, or AWS Lambda. If your domain is within a VPC, then the compute instance should be running inside the same VPC or have connectivity to the VPC.

If you have to register the snapshot repository from a local machine, replace the following lines in the preceding code:

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

Replace this code with the following and assume the RegisterSnapshotRepo role (make sure the IAM entity that you are using has appropriate permission to assume the RegisterSnapshotRepo role). Modify "arn:aws:iam::123456789012:role/RegisterSnapshotRepo" with the ARN of your RegisterSnapshotRepo role.

sts = boto3.Session().client("sts")
response = sts.assume_role(
    RoleArn="arn:aws:iam::123456789012:role/RegisterSnapshotRepo",
    RoleSessionName="Snapshot-Session"
)

awsauth = AWS4Auth(response['Credentials']['AccessKeyId'], response['Credentials']['SecretAccessKey'], region, service, session_token=response['Credentials']['SessionToken'])

Upon successfully running the sample code, you should receive output such as the following:

200
{"acknowledged":true}

You can also verify that you have successfully registered the snapshot repository by accessing OpenSearch Dashboards in your domain and navigating to Snapshots Managements, Repositories.

Create an index template

Index templates enable you to automatically assign configuration when new indexes are created that match a wildcard index pattern.

  1. Navigate to your domain’s OpenSearch Dashboards and choose the Dev Tools tab.
  2. Enter the following text in the left pane and choose the play icon to run it.

The index template applies to newly created indexes that match the pattern of "log*". These indexes are attached to the alias named hot and the replica count is set to 1.

PUT _index_template/snapshot_template
{
  "index_patterns" : ["log*"],
  "template": {
      "settings": {
      "number_of_replicas": 1
    },
    "aliases" : {
        "hot" : {}
    }
  }
}

Note that in the preceding example, you assign the template to indexes that match "log*". You can modify index_patterns for your use case.

Create an ISM policy

In this step, you create the ISM policy that updates the alias of an index before it is migrated to UltraWarm. The policy also performs hot to warm migration. This is to overcome the limitations where a snapshot can’t be taken across two storage tiers (hot and UltraWarm). Complete the following steps:

  1. Navigate to the Dev Tools page of OpenSearch Dashboards.
  2. Create the ISM policy using the following command (modify the values of index_patterns and min_index_age accordingly):
PUT /_plugins/_ism/policies/alias_policy
{
    "policy": {
        "policy_id": "alias_policy",
        "description": "Example Policy for changing the alias and performing the warm migration",
        "default_state": "hot_alias",
        "states": [{
            "name": "hot_alias",
            "actions": [],
            "transitions": [{
                "state_name": "warm",
                "conditions": {
                    "min_index_age": "30d"
                }
            }]
        }, {
            "name": "warm",
            "actions": [{
                "alias": {
                    "actions": [{
                        "remove": {
                            "aliases": ["hot"]
                        }
                    }, {
                        "add": {
                            "aliases": ["warm"]
                        }
                    }]
                }
            }, {
                "retry": {
                    "count": 5,
                    "backoff": "exponential",
                    "delay": "1h"
                },
                "warm_migration": {}
            }],
            "transitions": []
        }],
        "ism_template": [{
            "index_patterns": ["log*"],
            "priority": 100
        }]
    }
}

Create a snapshot policy

In this step, you create a snapshot policy, which takes a snapshot for all the indexes aliased as hot and stores them to your repository at the scheduled time in the cron expression (midnight UTC). Complete the following steps:

  1. Navigate to the Dev Tools page of OpenSearch Dashboards.
  2. Create the snapshot policy using the following command (modify the value of snapshot-repo-name to the name of the snapshot repository you registered previously):
POST _plugins/_sm/policies/daily-snapshot-for-manual-repo
{
    "description": "Policy for Daily Snapshot in the Manual repo",
    "creation": {
      "schedule": {
        "cron": {
          "expression": "0 0 * * *",
          "timezone": "UTC"
        }
      }
    },
    "deletion": {
      "schedule": {
        "cron": {
          "expression": "0 1 * * *",
          "timezone": "UTC"
        }
      },
      "condition": {
        "min_count": 1,
        "max_count": 40
      }
    },
    "snapshot_config": {
      "indices": "hot",
      "repository": "snapshot-repo-name"
    }
}

Clean up

Snapshots that you create incur cost in the S3 bucket used as the repository. With the new Snapshots Management feature, you can easily list and delete unwanted snapshots, and delete the ISM policy to stop taking manual snapshots directly from OpenSearch Dashboards.

Conclusion

With the new Snapshot Management capabilities of OpenSearch Service, you can create regular backups and ensure the availability of your data even in the event of unexpected events or disasters. In this post, we discussed essential concepts such as snapshot repositories, automated snapshot lifecycle policies, and Snapshot Management options, enabling you to make informed decisions when it comes to managing your data backups effectively. As you continue to explore and harness the potential of OpenSearch Service, incorporating Snapshot Management into your data protection strategy will undoubtedly provide you with the resilience and reliability needed to ensure business continuity.

If you have feedback about this post, share it in the comments section. If you have questions about this post, start a new thread on the Amazon OpenSearch Service forum or contact AWS Support.


About the authors

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Hendy Wijaya is a Senior OpenSearch Specialist Solutions Architect at Amazon Web Services. Hendy enables customers to leverage AWS services to achieve their business objectives and gain competitive advantages. He is passionate in collaborating with customers in getting the best out of OpenSearch and Amazon OpenSearch

Utkarsh Agarwal is a Cloud Support Engineer in the Support Engineering team at Amazon Web Services. He specializes in Amazon OpenSearch Service. He provides guidance and technical assistance to customers thus enabling them to build scalable, highly available and secure solutions in AWS Cloud. In his free time, he enjoys watching movies, TV series and of course cricket! Lately, he his also attempting to master the art of cooking in his free time – The taste buds are excited, but the kitchen might disagree.

Processing large records with Amazon Kinesis Data Streams

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/

In today’s digital era, data is abundant and constantly flowing. Businesses across industries are seeking ways to harness this wealth of information to gain valuable insights and make real-time decisions. To meet this need, AWS offers Amazon Kinesis Data Streams, a powerful and scalable real-time data streaming service. With Kinesis Data Streams, you can effortlessly collect, process, and analyze streaming data in real time at any scale. This service seamlessly integrates into your data architecture, allowing you to tap into the full potential of your data for informed decision-making.

Data streaming technologies like Kinesis Data Streams are designed to efficiently process and manage continuous streams of data in real time at large scale. The individual pieces of data within these streams are often referred to as records. In scenarios like large file processing or performing image, audio, or video analytics, your record may exceed 1 MB. You may struggle to ingest such a large record with Kinesis Data Streams because, as of this writing, the service has a 1 MB upper limit for maximum data record size.

In this post, we show you some different options for handling large records within Kinesis Data Streams and the benefits and disadvantages of each approach. We provide some sample code for each option to help you get started with any of these approaches with your own workloads.

Understanding the default behavior of Kinesis Data Streams

You can send records to Kinesis Data Streams using the PutRecord or PutRecords API calls. These APIs include a mandatory field known as PartitionKey, where you must provide a specific value. This partition key is used by the service to map records with the same partition keys to the same shard to ensure ordering and locality for consumption. Locality means that you want the same consumer to process all records for a given partition key. This helps ensure that data with the same partition key stays together within the same shard, maintaining data order.

Each shard, which holds your data, can handle writing up to 1 MB per second. Let’s consider a scenario where you define a partition key and attempt to send a data record that exceeds 1 MB in size. Based on the explanation so far, the service will reject this request because the record size is over 1 MB. To help you understand better, we experimented by trying to send a record of 1.5 MB to a stream, and the outcome was the following exception message:

import json
import boto3
client = boto3.client('kinesis', region_name='ap-southeast-2')

def lambda_handler(event, context):
    try:
        response = client.put_record(
            StreamName='test',
            Data=b'Sample 1 MB....',
            PartitionKey='string'
            #StreamARN='string'
        )
    
    except Exception as e:
        print (e)

START RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb Version: $LATEST An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value at 'data' failed to satisfy constraint: Member must have length less than or equal to 1048576 END RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb

Strategies for handling large records

Now that we understand the behavior of the PutRecord and PutRecords APIs, let’s discuss strategies you can use to overcome this situation. One thing to keep in mind is that there is no single best solution; in the following sections, we discuss some of the approaches that you can evaluate based on your use case:

  • Store large records in Amazon Simple Storage Service (Amazon S3) with a reference in Kinesis Data Streams
  • Split one large record into multiple records
  • Compress your large records

Let’s discuss these points one by one.

Store large records in Amazon S3 with a reference in Kinesis Data Streams

A useful approach for storing large records involves utilizing an alternative storage solution while employing a reference within Kinesis Data Streams. In this context, Amazon S3 stands out as an excellent choice due to its exceptional durability and cost-effectiveness. The procedure involves uploading the record as an object to an S3 bucket and subsequently writing a reference entry in Kinesis Data Streams. This entry incorporates an attribute that serves as a pointer, indicating the location of the object within Amazon S3.

With this approach, you can generate a pre-signed URL associated with the S3 object’s location. This link can be shared with the requester, offering them direct access to the object without the need for intermediary server-side data transfers.

The following diagram illustrates the architecture of this solution.

The following is the sample code to write data to Kinesis Data Streams using this approach:

import json
import boto3
import random

def lambda_handler(event, context):
    try:
        s3 = boto3.client('s3', region_name='ap-southeast-2')
        kds = boto3.client('kinesis', region_name='ap-southeast-2')
        expiration=3600
        pk=str(random.randint(100,100000000))
        bucket_name = 'MY_BUCKET'
        object_key = 'air/' + pk + '.txt'
        file_content = b'LARGE OBJECT'
        response = s3.put_object(Bucket=bucket_name, Key=object_key, Body=file_content)
        presigned_url = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': object_key},
            ExpiresIn=expiration
        )
        
        kdata = {'message': presigned_url}
        response = kds.put_record(
            StreamName='test',
            Data=json.dumps(kdata),
            PartitionKey=pk
        )
        print (response)
    except Exception as e:
        print (e)

If you are using an AWS Lambda consumer to process this data, you can now decode the record to get the S3 pre-signed URL to efficiently retrieve the object from Amazon S3. Then you can implement your business logic to effectively process the data. The following is sample code for reference:

import json
import base64
import json

def lambda_handler(event, context):
    item = None
    decoded_record_data = [base64.b64decode(record['kinesis']['data']).decode().replace('\n','') for record in event['Records']]
    deserialized_data = [json.loads(decoded_record) for decoded_record in decoded_record_data]
    
    
    for item in deserialized_data:
        LOB=(item['message'])
        #process LOB implementing your business logic

An inherent benefit of adopting this technique is the capability to store data in Amazon S3, accommodating an extensive range of sizes per individual object. This method helps you reduce the costs of using Kinesis Data Streams because it uses less storage space and requires fewer read and write throughput for item access. This optimization is achieved by storing just the URL within Kinesis Data Streams. However, it’s important to acknowledge that accessing the sizable object necessitates an additional call to Amazon S3, thereby introducing higher latency for clients as they manage the additional request.

Split one large record into multiple records

Splitting large records into smaller ones in Kinesis Data Streams brings advantages like faster processing, improved throughput, efficient resource use, and more straightforward error handling. Let’s say you have a large record that you want to split into smaller chunks before sending them to a Kinesis data stream. First, you need to set up a Kinesis producer. Suppose you have a large record as a string. You can split it into smaller chunks of a predefined size. For this example, let’s say you’re splitting the record into chunks of 100 characters each. After you split that, loop through the record chunks and send each chunk as a separate message to a Kinesis data stream. The following is the sample code:

import boto3
kinesis = boto3.client('kinesis', region_name='ap-southeast-2')  

def split_record(record, chunk_size):
    chunks = [record[i:i + chunk_size] for i in range(0, len(record), chunk_size)]
    return chunks

def send_to_kinesis(stream_name, record):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=record,
        PartitionKey= '100'
    )
    return response

def main():
    stream_name = 'test'  
    large_record = 'Your large record'  # Replace with your actual record
    chunk_size = 100  

    record_chunks = split_record(large_record, chunk_size)

    for chunk in record_chunks:
        response = send_to_kinesis(stream_name, chunk)
        print(f"Record sent: {response['SequenceNumber']}")

if __name__ == "__main__":
    main()

Ensure that all chunks of a given message are directed to a single partition, thereby guaranteeing the preservation of their order. In the final chunk, include metadata within the header indicating the conclusion of the message during production. This enables consumers to identify the ultimate chunk and facilitates seamless message reconstruction. The drawback of this method is that it adds complexity to the client-side tasks of dividing and putting back together the different parts. Therefore, these functions need thorough testing to prevent any loss of data.

Compress your large records

Applying data compression prior to transmitting it to Kinesis Data Streams has numerous advantages. This approach not only reduces the data’s size, enabling swifter travel and more efficient utilization of network resources, but also leads to cost savings in terms of storage expenses while optimizing overall resource consumption. Additionally, this practice simplifies storage and data retention. By using compression algorithms such as GZIP, Snappy, or LZ4, you can achieve substantial reduction in the size of large records. Compression brings the benefit of simplicity because it’s implemented seamlessly without requiring the caller to make changes to the item or use extra AWS services to support storage. However, compression introduces additional CPU overhead and latency on the producer side, and its impact on the compression ratio and efficiency can vary depending on the data type and format. Also, compression can enhance consumer throughput at the expense of some decompression overhead.

Conclusion

For real-time data streaming use cases, it’s essential to carefully consider the handling of large records when using Kinesis Data Streams. In this post, we discussed the challenges associated with managing large records and explored strategies such as utilizing Amazon S3 references, record splitting, and compression. Each approach has its own set of benefits and drawbacks, so it’s crucial to evaluate the nature of your data and the tasks you need to perform. Select the most suitable approach based on your data’s characteristics and your processing task requirements.

We encourage you to try out the approaches discussed in this post and share your thoughts in the comments section.


About the author

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Use SAML with Amazon Cognito to support a multi-tenant application with a single user pool

Post Syndicated from Neela Kulkarni original https://aws.amazon.com/blogs/security/use-saml-with-amazon-cognito-to-support-a-multi-tenant-application-with-a-single-user-pool/

Amazon Cognito is a customer identity and access management solution that scales to millions of users. With Cognito, you have four ways to secure multi-tenant applications: user pools, application clients, groups, or custom attributes. In an earlier blog post titled Role-based access control using Amazon Cognito and an external identity provider, you learned how to configure Cognito authentication and authorization with a single tenant. In this post, you will learn to configure Cognito with a single user pool for multiple tenants to securely access a business-to-business application by using SAML custom attributes. With custom-attribute–based multi-tenancy, you can store tenant identification data like tenantName as a custom attribute in a user’s profile and pass it to your application. You can then handle multi-tenancy logic in your application and backend services. With this approach, you can use a unified sign-up and sign-in experience for your users. To identify the user’s tenant, your application can use the tenantName custom attribute.

One Cognito user pool for multiple customers

Customers like the simplicity of using a single Cognito user pool for their multi-customer application. With this approach, your customers will use the same URL to access the application. You will set up each new customer by configuring SAML 2.0 integration with the customer’s external identity provider (IdP). Your customers can control access to your application by using an external identity store, such as Google Workspace, Okta, or Active Directory Federation Service (AD FS), in which they can create, manage, and revoke access for their users.

After SAML integration is configured, Cognito returns a JSON web token (JWT) to the frontend during the user authentication process. This JWT contains attributes your application can use for authorization and access control. The token contains claims about the identity of the authenticated user, such as name and email. You can use this identity information inside your application. You can also configure Cognito to add custom attributes to the JWT, such as tenantName.

In this post, we demonstrate the approach of keeping a mapping between a user’s email domain and tenant name in an Amazon DynamoDB table. The DynamoDB table will have an emailDomain field as a key and a corresponding tenantName field.

Cognito architecture

To illustrate how this works, we’ll start with a demo application that was introduced in the earlier blog post. The demo application is implemented by using Amazon Cognito, AWS Amplify, Amazon API Gateway, AWS Lambda, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), and Amazon CloudFront to achieve a serverless architecture. This architecture is shown in Figure 1.

Figure 1: Demo application architecture

Figure 1: Demo application architecture

The workflow that happens when you access the web application for the first time using your browser is as follows (the numbered steps correspond to the numbered labels in the diagram):

  1. The client-side/frontend of the application prompts you to enter the email that you want to use to sign in to the application.
  2. The application invokes the Tenant Match API action through API Gateway, which, in turn, calls the Lambda function that takes the email address as an input and queries it against the DynamoDB table with the email domain. Figure 2 shows the data stored in DynamoDB, which includes the tenant name and IdP ID. You can add additional flexibility to this solution by adding web client IDs or custom redirect URLs. For the purpose of this example, we are using the same redirect URL for all tenants (the client application).
    Figure 2: DynamoDB tenant table

    Figure 2: DynamoDB tenant table

  3. If a matching record is found, the Lambda function returns the record to the AWS Amplify frontend application.
  4. The client application uses the IdP ID from the response and passes it to Cognito for federated login. Cognito then reroutes the login request to the corresponding IdP. The AWS Amplify frontend application then redirects the browser to the IdP.
  5. At the IdP sign-in page, you sign in with a valid user account (for example, [email protected] or [email protected]). After you sign in successfully, a SAML response is sent back from the IdP to Cognito.

    You can review the SAML content by using the instructions in How to view a SAML response in your browser for troubleshooting, as shown in Figure 3.

    Figure 3: SAML content

    Figure 3: SAML content

  6. Cognito handles the SAML response and maps the SAML attributes to a just-in-time user profile. The SAML groups attributes is mapped to a custom user pool attribute named custom:groups.
  7. To identify the tenant, additional attributes are populated in the JWT. After successful authentication, a PreTokenGeneration Lambda function is invoked, which reads the mapped custom:groups attribute value from SAML, parses it, and converts it to an array. After that, the function parses the email address and captures the domain name. It then queries the DynamoDB table for the tenantName name by using the email domain name. Finally, the function sets the custom:domainName and custom:tenantName attributes in the JWT, as shown following.
    "email": "[email protected]" ( Standard existing profile attribute )
    New attributes:
    "cognito:groups": [.                           
    "pet-app-users",
    "pet-app-admin"
    ],
    "custom:tenantName": "AnyCompany"
    "custom:domainName": "anycompany.com"

    This attribute conversion is optional and demonstrates how you can use a PreTokenGeneration Lambda invocation to customize your JWT token claims, mapping the IdP groups to the attributes your application recognizes. You can also use this invocation to make additional authorization decisions. For example, if user is a member of multiple groups, you may choose to map only one of them.

  8. Amazon Cognito returns the JWT tokens to the AWS Amplify frontend application. The Amplify client library stores the tokens and handles refreshes. This token is used to make calls to protected APIs in Amazon API Gateway.
  9. API Gateway uses a Cognito user pools authorizer to validate the JWT’s signature and expiration. If this is successful, API Gateway passes the JWT to the application’s Lambda function (also referred to as the backend).
  10. The backend application code reads the cognito:groups claim from the JWT and decides if the action is allowed. If the user is a member of the right group, then the action is allowed; otherwise the action is denied.

Implement the solution

You can implement this example application by using an AWS CloudFormation template to provision your cloud application and AWS resources.

To deploy the demo application described in this post, you need the following prerequisites:

  1. An AWS account.
  2. Familiarity with navigating the AWS Management Console or AWS CLI.
  3. Familiarity with deploying CloudFormation templates.

To deploy the template

  • Choose the following Launch Stack button to launch a CloudFormation stack in your account.

    Select this image to open a link that starts building the CloudFormation stack

    Note: The stack will launch in the N. Virginia (us-east-1) Region. To deploy this solution into other AWS Regions, download the solution’s CloudFormation template from GitHub, modify it, and deploy it to the selected Region.

The stack creates a Cognito user pool called ExternalIdPDemoPoolXXXX in the AWS Region that you have specified. The CloudFormation Outputs field contains a list of values that you will need for further configuration.

IdP configuration

The next step is to configure your IdP. Each IdP has its own procedure for configuration, but there are some common steps you need to follow.

To configure your IdP

  1. Provide the IdP with the values for the following two properties:
    • Single sign on URL / Assertion Consumer Service URL / ACS URL (for this example, https://<CognitoDomainURL>/saml2/idpresponse)
    • Audience URI / SP Entity ID / Entity ID: (For this example, urn:amazon:cognito:sp:<yourUserPoolID>)
  2. Configure the field mapping for the SAML response in the IdP. Map the first name, last name, email, and groups (as a multi-value attribute) into SAML response attributes with the names firstName, lastName, email, and groups, respectively.
    • Recommended: Filter the mapped groups to only those that are relevant to the application (for example, by a prefix filter). There is a 2,048-character limit on the custom attribute, so filtering helps avoid exceeding the character limit, and also helps avoid passing irrelevant information to the application.
  3. In each IdP, create two demo groups called pet-app-users and pet-app-admins, and create two demo users, for example, [email protected] and [email protected], and then assign one to each group, respectively.

To illustrate, we set up three different IdPs to represent three different tenants. Use the following links for instructions on how to configure each IdP:

You will need the metadata URL or file from each IdP, because you will use this to configure your user pool integration. For more information, see Integrating third-party SAML identity providers with Amazon Cognito user pools.

Cognito configuration

After your IdPs are configured and your CloudFormation stack is deployed, you can configure Cognito.

To configure Cognito

  1. Use your browser to navigate to the Cognito console, and for User pool name, select the Cognito user pool.
    Figure 4: Select the Cognito user pool

    Figure 4: Select the Cognito user pool

  2. On the Sign-in experience screen, on the Federated identity provider sign-in tab, choose Add identity provider.
  3. Choose SAML for the sign-in option, and then enter the values for your IdP. You can either upload the metadata XML file or provide the metadata endpoint URL. Add mapping for the attributes as shown in Figure 5.
    Figure 5: Attribute mappings for the IdP

    Figure 5: Attribute mappings for the IdP

    Upon completion you will see the new IdP displayed as shown in Figure 6.

    Figure 6: List of federated IdPs

    Figure 6: List of federated IdPs

  4. On the App integration tab, select the app client that was created by the CloudFormation template.
    Figure 7: Select the app client

    Figure 7: Select the app client

  5. Under Hosted UI, choose Edit. Under Identity providers, select the Identity Providers that you want to set up for federated login, and save the change.
    Figure 8: Select identity providers

    Figure 8: Select identity providers

API gateway

The example application uses a serverless backend. There are two API operations defined in this example, as shown in Figure 9. One operation gets tenant details and the other is the /pets API operation, which fetches information on pets based on user identity. The TenantMatch API operation will be run when you sign in with your email address. The operation passes your email address to the backend Lambda function.

Figure 9: Example APIs

Figure 9: Example APIs

Lambda functions

You will see three Lambda functions deployed in the example application, as shown in Figure 10.

Figure 10: Lambda functions

Figure 10: Lambda functions

The first one is GetTenantInfo, which is used for the TenantMatch API operation. It reads the data from the TenantTable based on the email domain and passes the record back to the application. The second function is PreTokenGeneration, which reads the mapped custom:groups attribute value, parses it, converts it to an array, and then stores it in the cognito:groups claim. The second Lambda function is invoked by the Cognito user pool after sign-in is successful. In order to customize the mapping, you can edit the Lambda function’s code in the index.js file and redeploy. The third Lambda function is added to support the Pets API operation.

DynamoDB tables

You will see three DynamoDB tables deployed in the example application, as shown in Figure 11.

Figure 11: DynamoDB tables

Figure 11: DynamoDB tables

The TenantTable table holds the tenant details where you must add the mapping between the customer domain and the IdP ID setup in Cognito. This approach can be expanded to add more flexibility in case you want to add custom redirect URLs or Cognito app IDs for each tenant. You must create entries to correspond to the IdPs you have configured, as shown in Figure 12.

Figure 12: Tenant IdP mappings table

Figure 12: Tenant IdP mappings table

In addition to TenantTable, there is the ExternalIdPDemo-ItemsTable table, which holds the data related to the Pets application, based on user identity. There is also ExternalIdPDemo-UsersTable, which holds user details like the username, last forced sign-out time, and TTL required for the application to manage the user session.

You can now sign in to the example application through each IdP by navigating to the application URL found in the CloudFormation Outputs section, as shown in Figure 13.

Figure 13: Cognito sign-in screen

Figure 13: Cognito sign-in screen

You will be redirected to the IdP, as shown in Figure 14.

Figure 14: Google Workspace sign-in screen

Figure 14: Google Workspace sign-in screen

The AWS Amplify frontend application parses the JWT to identify the tenant name and provide authorization based on group membership, as shown in Figure 15.

Figure 15: Application home screen upon successful sign-in

Figure 15: Application home screen upon successful sign-in

If a different user logs in with a different role, the AWS Amplify frontend application provides authorization based on specific content of the JWT.

Conclusion

You can integrate your application with your customer’s IdP of choice for authentication and authorization and map information from the IdP to the application. By using Amazon Cognito, you can normalize the structure of the JWT token that is used for this process, so that you can add multiple IdPs, each for a different tenant, through a single Cognito user pool. You can do all this without changing application code. The native integration of Amazon API Gateway with the Cognito user pools authorizer streamlines your validation of the JWT integrity, and after the JWT has been validated, you can use it to make authorization decisions in your application’s backend. By following the example in this post, you can focus on what differentiates your application, and let AWS do the undifferentiated heavy lifting of identity management for your customer-facing applications.

For the code examples described in this post, see the amazon-cognito-example-for-multi-tenant code repository on GitHub. To learn more about using Cognito with external IdPs, see the Amazon Cognito documentation. You can also learn to build software as a service (SaaS) application architectures on AWS. If you have any questions about Cognito or any other AWS services, you may post them to AWS re:Post.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Security, Identity, & Compliance re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Ray Zaman

Ray Zaman

A Principal Solutions Architect with AWS, Ray has over 30 years of experience helping customers in finance, healthcare, insurance, manufacturing, media, petrochemical, pharmaceutical, public utility, retail, semiconductor, telecommunications, and waste management industries build technology solutions.

Neela Kulkarni

Neela Kulkarni

Neela is a Solutions Architect with Amazon Web Services. She primarily serves independent software vendors in the Northeast US, providing architectural guidance and best practice recommendations for new and existing workloads. Outside of work, she enjoys traveling, swimming, and spending time with her family.

Yuri Duchovny

Yuri Duchovny

Yuri is a New York–based Solutions Architect specializing in cloud security, identity, and compliance. He supports cloud transformations at large enterprises, helping them make optimal technology and organizational decisions. Prior to his AWS role, Yuri’s areas of focus included application and networking security, DoS, and fraud protection. Outside of work, he enjoys skiing, sailing, and traveling the world.

Abdul Qadir

Abdul Qadir

Abdul is an AWS Solutions Architect based in New Jersey. He works with independent software vendors in the Northeast US and helps customers build well-architected solutions on the AWS Cloud platform.

Using AWS AppSync and AWS Lake Formation to access a secure data lake through a GraphQL API

Post Syndicated from Rana Dutt original https://aws.amazon.com/blogs/big-data/using-aws-appsync-and-aws-lake-formation-to-access-a-secure-data-lake-through-a-graphql-api/

Data lakes have been gaining popularity for storing vast amounts of data from diverse sources in a scalable and cost-effective way. As the number of data consumers grows, data lake administrators often need to implement fine-grained access controls for different user profiles. They might need to restrict access to certain tables or columns depending on the type of user making the request. Also, businesses sometimes want to make data available to external applications but aren’t sure how to do so securely. To address these challenges, organizations can turn to GraphQL and AWS Lake Formation.

GraphQL provides a powerful, secure, and flexible way to query and retrieve data. AWS AppSync is a service for creating GraphQL APIs that can query multiple databases, microservices, and APIs from one unified GraphQL endpoint.

Data lake administrators can use Lake Formation to govern access to data lakes. Lake Formation offers fine-grained access controls for managing user and group permissions at the table, column, and cell level. It can therefore ensure data security and compliance. Additionally, this Lake Formation integrates with other AWS services, such as Amazon Athena, making it ideal for querying data lakes through APIs.

In this post, we demonstrate how to build an application that can extract data from a data lake through a GraphQL API and deliver the results to different types of users based on their specific data access privileges. The example application described in this post was built by AWS Partner NETSOL Technologies.

Solution overview

Our solution uses Amazon Simple Storage Service (Amazon S3) to store the data, AWS Glue Data Catalog to house the schema of the data, and Lake Formation to provide governance over the AWS Glue Data Catalog objects by implementing role-based access. We also use Amazon EventBridge to capture events in our data lake and launch downstream processes. The solution architecture is shown in the following diagram.

Appsync and LakeFormation Arch itecture diagram

Figure 1 – Solution architecture

The following is a step by step description of the solution:

  1. The data lake is created in an S3 bucket registered with Lake Formation. Whenever new data arrives, an EventBridge rule is invoked.
  2. The EventBridge rule runs an AWS Lambda function to start an AWS Glue crawler to discover new data and update any schema changes so that the latest data can be queried.
    Note: AWS Glue crawlers can also be launched directly from Amazon S3 events, as described in this blog post.
  3. AWS Amplify allows users to sign in using Amazon Cognito as an identity provider. Cognito authenticates the user’s credentials and returns access tokens.
  4. Authenticated users invoke an AWS AppSync GraphQL API through Amplify, fetching data from the data lake. A Lambda function is run to handle the request.
  5. The Lambda function retrieves the user details from Cognito and assumes the AWS Identity and Access Management (IAM) role associated with the requesting user’s Cognito user group.
  6. The Lambda function then runs an Athena query against the data lake tables and returns the results to AWS AppSync, which then returns the results to the user.

Prerequisites

To deploy this solution, you must first do the following:

git clone [email protected]:aws-samples/aws-appsync-with-lake-formation.git
cd aws-appsync-with-lake-formation

Prepare Lake Formation permissions

Sign in to the LakeFormation console and add yourself as an administrator. If you’re signing in to Lake Formation for the first time, you can do this by selecting Add myself on the Welcome to Lake Formation screen and choosing Get started as shown in Figure 2.

Figure 2 – Add yourself as the Lake Formation administrator

Otherwise, you can choose Administrative roles and tasks in the left navigation bar and choose Manage Administrators to add yourself. You should see your IAM username under Data lake administrators with Full access when done.

Select Data catalog settings in the left navigation bar and make sure the two IAM access control boxes are not selected, as shown in Figure 3. You want Lake Formation, not IAM, to control access to new databases.

Lake Formation data catalog settings

Figure 3 – Lake Formation data catalog settings

Deploy the solution

To create the solution in your AWS environment, launch the following AWS CloudFormation stack:  Launch Cloudformation Stack

The following resources will be launched through the CloudFormation template:

  • Amazon VPC and networking components (subnets, security groups, and NAT gateway)
  • IAM roles
  • Lake Formation encapsulating S3 bucket, AWS Glue crawler, and AWS Glue database
  • Lambda functions
  • Cognito user pool
  • AWS AppSync GraphQL API
  • EventBridge rules

After the required resources have been deployed from the CloudFormation stack, you must create two Lambda functions and upload the dataset to Amazon S3. Lake Formation will govern the data lake that is stored in the S3 bucket.

Create the Lambda functions

Whenever a new file is placed in the designated S3 bucket, an EventBridge rule is invoked, which launches a Lambda function to initiate the AWS Glue crawler. The crawler updates the AWS Glue Data Catalog to reflect any changes to the schema.

When the application makes a query for data through the GraphQL API, a request handler Lambda function is invoked to process the query and return the results.

To create these two Lambda functions, proceed as follows.

  1. Sign in to the Lambda console.
  2. Select the request handler Lambda function named dl-dev-crawlerLambdaFunction.
  3. Find the crawler Lambda function file in your lambdas/crawler-lambda folder in the git repo that you cloned to your local machine.
  4. Copy and paste the code in that file to the Code section of the dl-dev-crawlerLambdaFunction in your Lambda console. Then choose Deploy to deploy the function.
Copy and paste code into the Lambda function

Figure 4 – Copy and paste code into the Lambda function

  1. Repeat steps 2 through 4 for the request handler function named dl-dev-requestHandlerLambdaFunction using the code in lambdas/request-handler-lambda.

Create a layer for the request handler Lambda

You now must upload some additional library code needed by the request handler Lambda function.

  1. Select Layers in the left menu and choose Create layer.
  2. Enter a name such as appsync-lambda-layer.
  3. Download this package layer ZIP file to your local machine.
  4. Upload the ZIP file using the Upload button on the Create layer page.
  5. Choose Python 3.7 as the runtime for the layer.
  6. Choose Create.
  7. Select Functions on the left menu and select the dl-dev-requestHandler Lambda function.
  8. Scroll down to the Layers section and choose Add a layer.
  9. Select the Custom layers option and then select the layer you created above.
  10. Click Add.

Upload the data to Amazon S3

Navigate to the root directory of the cloned git repository and run the following commands to upload the sample dataset. Replace the bucket_name placeholder with the S3 bucket provisioned using the CloudFormation template. You can get the bucket name from the CloudFormation console by going to the Outputs tab with key datalakes3bucketName as shown in image below.

Figure 5 – S3 bucket name shown in CloudFormation Outputs tab

Figure 5 – S3 bucket name shown in CloudFormation Outputs tab

Enter the following commands in your project folder in your local machine to upload the dataset to the S3 bucket.

cd dataset
aws s3 cp . s3://bucket_name/ --recursive

Now let’s take a look at the deployed artifacts.

Data lake

The S3 bucket holds sample data for two entities: companies and their respective owners. The bucket is registered with Lake Formation, as shown in Figure 6. This enables Lake Formation to create and manage data catalogs and manage permissions on the data.

Figure 6 – Lake Formation console showing data lake location

Figure 6 – Lake Formation console showing data lake location

A database is created to hold the schema of data present in Amazon S3. An AWS Glue crawler is used to update any change in schema in the S3 bucket. This crawler is granted permission to CREATE, ALTER, and DROP tables in the database using Lake Formation.

Apply data lake access controls

Two IAM roles are created, dl-us-east-1-developer and dl-us-east-1-business-analyst, each assigned to a different Cognito user group. Each role is assigned different authorizations through Lake Formation. The Developer role gains access to every column in the data lake, while the Business Analyst role is only granted access to the non-personally identifiable information (PII) columns.

Lake Formation console data lake permissions assigned to group roles

Figure 7 –Lake Formation console data lake permissions assigned to group roles

GraphQL schema

The GraphQL API is viewable from the AWS AppSync console. The Companies type includes several attributes describing the owners of the companies.

Schema for GraphQL API

Figure 8 – Schema for GraphQL API

The data source for the GraphQL API is a Lambda function, which handles the requests.

– AWS AppSync data source mapped to Lambda function

Figure 9 – AWS AppSync data source mapped to Lambda function

Handling the GraphQL API requests

The GraphQL API request handler Lambda function retrieves the Cognito user pool ID from the environment variables. Using the boto3 library, you create a Cognito client and use the get_group method to obtain the IAM role associated to the Cognito user group.

You use a helper function in the Lambda function to obtain the role.

def get_cognito_group_role(group_name):
    response = cognito_idp_client.get_group(
            GroupName=group_name,
            UserPoolId=cognito_user_pool_id
        )
    print(response)
    role_arn = response.get('Group').get('RoleArn')
    return role_arn

Using the AWS Security Token Service (AWS STS) through a boto3 client, you can assume the IAM role and obtain the temporary credentials you need to run the Athena query.

def get_temp_creds(role_arn):
    response = sts_client.assume_role(
        RoleArn=role_arn,
        RoleSessionName='stsAssumeRoleAthenaQuery',
    )
    return response['Credentials']['AccessKeyId'],
response['Credentials']['SecretAccessKey'],  response['Credentials']['SessionToken']

We pass the temporary credentials as parameters when creating our Boto3 Amazon Athena client.

athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token)

The client and query are passed into our Athena query helper function which executes the query and returns a query id. With the query id, we are able to read the results from S3 and bundle it as a Python dictionary to be returned in the response.

def get_query_result(s3_client, output_location):
    bucket, object_key_path = get_bucket_and_path(output_location)
    response = s3_client.get_object(Bucket=bucket, Key=object_key_path)
    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
    result = []
    if status == 200:
        print(f"Successful S3 get_object response. Status - {status}")
        df = pandas.read_csv(response.get("Body"))
        df = df.fillna('')
        result = df.to_dict('records')
        print(result)
    else:
        print(f"Unsuccessful S3 get_object response. Status - {status}")
    return result

Enabling client-side access to the data lake

On the client side, AWS Amplify is configured with an Amazon Cognito user pool for authentication. We’ll navigate to the Amazon Cognito console to view the user pool and groups that were created.

Figure 10 –Amazon Cognito User pools

Figure 10 –Amazon Cognito User pools

For our sample application we have two groups in our user pool:

  • dl-dev-businessAnalystUserGroup – Business analysts with limited permissions.
  • dl-dev-developerUserGroup – Developers with full permissions.

If you explore these groups, you’ll see an IAM role associated to each. This is the IAM role that is assigned to the user when they authenticate. Athena assumes this role when querying the data lake.

If you view the permissions for this IAM role, you’ll notice that it doesn’t include access controls below the table level. You need the additional layer of governance provided by Lake Formation to add fine-grained access control.

After the user is verified and authenticated by Cognito, Amplify uses access tokens to invoke the AWS AppSync GraphQL API and fetch the data. Based on the user’s group, a Lambda function assumes the corresponding Cognito user group role. Using the assumed role, an Athena query is run and the result returned to the user.

Create test users

Create two users, one for dev and one for business analyst, and add them to user groups.

  1. Navigate to Cognito and select the user pool, dl-dev-cognitoUserPool, that’s created.
  2. Choose Create user and provide the details to create a new business analyst user. The username can be biz-analyst. Leave the email address blank, and enter a password.
  3. Select the Users tab and select the user you just created.
  4. Add this user to the business analyst group by choosing the Add user to group button.
  5. Follow the same steps to create another user with the username developer and add the user to the developers group.

Test the solution

To test your solution, launch the React application on your local machine.

  1. In the cloned project directory, navigate to the react-app directory.
  2. Install the project dependencies.
npm install
  1. Install the Amplify CLI:
npm install -g @aws-amplify/cli
  1. Create a new file called .env by running the following commands. Then use a text editor to update the environment variable values in the file.
echo export REACT_APP_APPSYNC_URL=Your AppSync endpoint URL > .env
echo export REACT_APP_CLIENT_ID=Your Cognito app client ID >> .env
echo export REACT_APP_USER_POOL_ID=Your Cognito user pool ID >> .env

Use the Outputs tab of your CloudFormation console stack to get the required values from the keys as follows:

REACT_APP_APPSYNC_URL appsyncApiEndpoint
REACT_APP_CLIENT_ID cognitoUserPoolClientId
REACT_APP_USER_POOL_ID cognitoUserPoolId
  1. Add the preceding variables to your environment.
source .env
  1. Generate the code needed to interact with the API using Amplify CodeGen. In the Outputs tab of your Cloudformation console, find your AWS Appsync API ID next to the appsyncApiId key.
amplify add codegen --apiId <appsyncApiId>

Accept all the default options for the above command by pressing Enter at each prompt.

  1. Start the application.
npm start

You can confirm that the application is running by visiting http://localhost:3000 and signing in as the developer user you created earlier.

Now that you have the application running, let’s take a look at how each role is served from the companies endpoint.

First, sign is as the developer role, which has access to all the fields, and make the API request to the companies endpoint. Note which fields you have access to.

The results for developer role

Figure 11 –The results for developer role

Now, sign in as the business analyst user and make the request to the same endpoint and compare the included fields.

The results for Business Analyst role

Figure 12 –The results for Business Analyst role

The First Name and Last Name columns of the companies list is excluded in the business analyst view even though you made the request to the same endpoint. This demonstrates the power of using one unified GraphQL endpoint together with multiple Cognito user group IAM roles mapped to Lake Formation permissions to manage role-based access to your data.

Cleaning up

After you’re done testing the solution, clean up the following resources to avoid incurring future charges:

  1. Empty the S3 buckets created by the CloudFormation template.
  2. Delete the CloudFormation stack to remove the S3 buckets and other resources.

Conclusion

In this post, we showed you how to securely serve data in a data lake to authenticated users of a React application based on their role-based access privileges. To accomplish this, you used GraphQL APIs in AWS AppSync, fine-grained access controls from Lake Formation, and Cognito for authenticating users by group and mapping them to IAM roles. You also used Athena to query the data.

For related reading on this topic, see Visualizing big data with AWS AppSync, Amazon Athena, and AWS Amplify and Design a data mesh architecture using AWS Lake Formation and AWS Glue.

Will you implement this approach for serving data from your data lake? Let us know in the comments!


About the Authors

Rana Dutt is a Principal Solutions Architect at Amazon Web Services. He has a background in architecting scalable software platforms for financial services, healthcare, and telecom companies, and is passionate about helping customers build on AWS.

Ranjith Rayaprolu is a Senior Solutions Architect at AWS working with customers in the Pacific Northwest. He helps customers design and operate Well-Architected solutions in AWS that address their business problems and accelerate the adoption of AWS services. He focuses on AWS security and networking technologies to develop solutions in the cloud across different industry verticals. Ranjith lives in the Seattle area and loves outdoor activities.

Justin Leto is a Sr. Solutions Architect at Amazon Web Services with specialization in databases, big data analytics, and machine learning. His passion is helping customers achieve better cloud adoption. In his spare time, he enjoys offshore sailing and playing jazz piano. He lives in New York City with his wife and baby daughter.

Simplify data transfer: Google BigQuery to Amazon S3 using Amazon AppFlow

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/simplify-data-transfer-google-bigquery-to-amazon-s3-using-amazon-appflow/

In today’s data-driven world, the ability to effortlessly move and analyze data across diverse platforms is essential. Amazon AppFlow, a fully managed data integration service, has been at the forefront of streamlining data transfer between AWS services, software as a service (SaaS) applications, and now Google BigQuery. In this blog post, you explore the new Google BigQuery connector in Amazon AppFlow and discover how it simplifies the process of transferring data from Google’s data warehouse to Amazon Simple Storage Service (Amazon S3), providing significant benefits for data professionals and organizations, including the democratization of multi-cloud data access.

Overview of Amazon AppFlow

Amazon AppFlow is a fully managed integration service that you can use to securely transfer data between SaaS applications such as Google BigQuery, Salesforce, SAP, Hubspot, and ServiceNow, and AWS services such as Amazon S3 and Amazon Redshift, in just a few clicks. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities such as filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats.

Introducing the Google BigQuery connector

The new Google BigQuery connector in Amazon AppFlow unveils possibilities for organizations seeking to use the analytical capability of Google’s data warehouse, and to effortlessly integrate, analyze, store, or further process data from BigQuery, transforming it into actionable insights.

Architecture

Let’s review the architecture to transfer data from Google BigQuery to Amazon S3 using Amazon AppFlow.

architecture

  1. Select a data source: In Amazon AppFlow, select Google BigQuery as your data source. Specify the tables or datasets you want to extract data from.
  2. Field mapping and transformation: Configure the data transfer using the intuitive visual interface of Amazon AppFlow. You can map data fields and apply transformations as needed to align the data with your requirements.
  3. Transfer frequency: Decide how frequently you want to transfer data—such as daily, weekly, or monthly—supporting flexibility and automation.
  4. Destination: Specify an S3 bucket as the destination for your data. Amazon AppFlow will efficiently move the data, making it accessible in your Amazon S3 storage.
  5. Consumption: Use Amazon Athena to analyze the data in Amazon S3.

Prerequisites

The dataset used in this solution is generated by Synthea, a synthetic patient population simulator and opensource project under the Apache License 2.0. Load this data into Google BigQuery or use your existing dataset.

Connect Amazon AppFlow to your Google BigQuery account

For this post, you use a Google account, OAuth client with appropriate permissions, and Google BigQuery data. To enable Google BigQuery access from Amazon AppFlow, you must set up a new OAuth client in advance. For instructions, see Google BigQuery connector for Amazon AppFlow.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

Create a new S3 bucket for Amazon AppFlow results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, appflow-bq-sample.
  3. Choose Create bucket.

Create a new S3 bucket for Amazon Athena results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, athena-results.
  3. Choose Create bucket.

User role (IAM role) for AWS Glue Data Catalog

To catalog the data that you transfer with your flow, you must have the appropriate user role in AWS Identity and Access Management (IAM). You provide this role to Amazon AppFlow to grant the permissions it needs to create an AWS Glue Data Catalog, tables, databases, and partitions.

For an example IAM policy that has the required permissions, see Identity-based policy examples for Amazon AppFlow.

Walkthrough of the design

Now, let’s walk through a practical use case to see how the Amazon AppFlow Google BigQuery to Amazon S3 connector works. For the use case, you will use Amazon AppFlow to archive historical data from Google BigQuery to Amazon S3 for long-term storage an analysis.

Set up Amazon AppFlow

Create a new Amazon AppFlow flow to transfer data from Google Analytics to Amazon S3.

  1. On the Amazon AppFlow console, choose Create flow.
  2. Enter a name for your flow; for example, my-bq-flow.
  3. Add necessary Tags; for example, for Key enter env and for Value enter dev.

appflow-flow-setup­­­­

  1. Choose Next.
  2. For Source name, choose Google BigQuery.
  3. Choose Create new connection.
  4. Enter your OAuth Client ID and Client Secret, then name your connection; for example, bq-connection.

­bq-connection

  1. In the pop-up window, choose to allow amazon.com access to the Google BigQuery API.

bq-authentication

  1. For Choose Google BigQuery object, choose Table.
  2. For Choose Google BigQuery subobject, choose BigQueryProjectName.
  3. For Choose Google BigQuery subobject, choose DatabaseName.
  4. For Choose Google BigQuery subobject, choose TableName.
  5. For Destination name, choose Amazon S3.
  6. For Bucket details, choose the Amazon S3 bucket you created for storing Amazon AppFlow results in the prerequisites.
  7. Enter raw as a prefix.

appflow-source-destination

  1. Next, provide AWS Glue Data Catalog settings to create a table for further analysis.
    1. Select the User role (IAM role) created in the prerequisites.
    2. Create new database for example, healthcare.
    3. Provide a table prefix setting for example, bq.

glue-crawler-config

  1. Select Run on demand.

appflow-trigger-setup

  1. Choose Next.
  2. Select Manually map fields.
  3. Select the following six fields for Source field name from the table Allergies:
    1. Start
    2. Patient
    3. Code
    4. Description
    5. Type
    6. Category
  4. Choose Map fields directly.

appflow-field-mapping

  1. Choose Next.
  2. In the Add filters section, choose Next.
  3. Choose Create flow.

Run the flow

After creating your new flow, you can run it on demand.

  1. On the Amazon AppFlow console, choose my-bq-flow.
  2. Choose Run flow.

sppflow-run--status

For this walkthrough, choose run the job on-demand for ease of understanding. In practice, you can choose a scheduled job and periodically extract only newly added data.

Query through Amazon Athena

When you select the optional AWS Glue Data Catalog settings, Data Catalog creates the catalog for the data, allowing Amazon Athena to perform queries.

If you’re prompted to configure a query results location, navigate to the Settings tab and choose Manage. Under Manage settings, choose the Athena results bucket created in prerequisites and choose Save.

  1. On the Amazon Athena console, select the Data Source as AWSDataCatalog.
  2. Next, select Database as healthcare.
  3. Now you can select the table created by the AWS Glue crawler and preview it.

athena-results

  1. You can also run a custom query to find the top 10 allergies as shown in the following query.

Note: In the below query, replace the table name, in this case bq_appflow_mybqflow_1693588670_latest, with the name of the table generated in your AWS account.

SELECT type,
category,
"description",
count(*) as number_of_cases
FROM "healthcare"."bq_appflow_mybqflow_1693588670_latest"
GROUP BY type,
category,
"description"
ORDER BY number_of_cases DESC
LIMIT 10;

  1. Choose Run query.

athena-custom-query-results

This result shows the top 10 allergies by number of cases.

Clean up

To avoid incurring charges, clean up the resources in your AWS account by completing the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. From the list of flows, select the flow my-bq-flow, and delete it.
  3. Enter delete to delete the flow.
  4. Choose Connections in the navigation pane.
  5. Choose Google BigQuery from the list of connectors, select bq-connector, and delete it.
  6. Enter delete to delete the connector.
  7. On the IAM console, choose Roles in the navigation page, then select the role you created for AWS Glue crawler and delete it.
  8. On the Amazon Athena console:
    1. Delete the tables created under the database healthcare using AWS Glue crawler.
    2. Drop the database healthcare
  9. On the Amazon S3 console, search for the Amazon AppFlow results bucket you created, choose Empty to delete the objects, then delete the bucket.
  10. On the Amazon S3 console, search for the Amazon Athena results bucket you created, choose Empty to delete the objects, then delete the bucket.
  11. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The Google BigQuery connector in Amazon AppFlow streamlines the process of transferring data from Google’s data warehouse to Amazon S3. This integration simplifies analytics and machine learning, archiving, and long-term storage, providing significant benefits for data professionals and organizations seeking to harness the analytical capabilities of both platforms.

With Amazon AppFlow, the complexities of data integration are eliminated, enabling you to focus on deriving actionable insights from your data. Whether you’re archiving historical data, performing complex analytics, or preparing data for machine learning, this connector simplifies the process, making it accessible to a broader range of data professionals.

If you’re interested to see how the data transfer from Google BigQuery to Amazon S3 using Amazon AppFlow, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on Amazon AppFlow, visit Amazon AppFlow.


About the authors

Kartikay Khator is a Solutions Architect on the Global Life Science at Amazon Web Services. He is passionate about helping customers on their cloud journey with focus on AWS analytics services. He is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Enhancing Resource Isolation in AWS CDK with the App Staging Synthesizer

Post Syndicated from Jehu Gray original https://aws.amazon.com/blogs/devops/enhancing-resource-isolation-in-aws-cdk-with-the-app-staging-synthesizer/

AWS Cloud Development Kit (CDK) has become a powerful tool for defining and provisioning AWS cloud resources. While CDK simplifies the process of infrastructure as code, managing resources across different projects and environments can still present challenges. In this blog post, we’ll explore a new experimental library, the App Staging Synthesizer, that enhances resource isolation and provides finer control over staging resources in CDK applications.

Background: The CDK Bootstrapping Model

Let’s consider a scenario where a company has two projects in the same account, Project A and Project B. Both projects are developed using the AWS CDK and deploy various AWS resources. However, the company wants to ensure that resources used in Project A are not discoverable or accessible to Project B. Prior to the introduction of the App Staging Synthesizer library in CDK, the default bootstrapping process created shared staging resources, such as a single Amazon S3 bucket and Amazon ECR repository, which are used by all CDK applications deployed in the CDK environment. In AWS CDK, a combination of region and an account is considered to be an environment. The traditional CDK bootstrapping method offers simplicity and consistency by providing a standardized set of shared staging resources for all CDK applications in an environment, which can be cost-effective for multiple applications. This shared model makes it challenging to control access and visibility between the projects in the same account, particularly in scenarios where resource isolation is crucial between different projects. In such scenarios, AWS recommends a best practice of separating projects that need critical isolation into different AWS accounts. However, it is recognized that there might be organizational or practical reasons preventing the immediate adoption of this recommendation. In such cases, mechanisms like the App Staging Synthesizer can provide a valuable workaround.

Introducing the App Staging Synthesizer:

Today, a growing trend among customers is the consolidation of their cloud accounts driven by the desire to optimize costs, bolster security and enhance compliance control. However, while consolidation offers several advantages, it can sometimes limit the flexibility to align ownership and decision making with individual accounts. This can lead to dependencies and conflicts in how workloads across accounts are secured and managed. The App Staging Synthesizer which is an experimental library designed to provide a more flexible approach to resource management and staging in CDK applications was designed to address these challenges. The AppStagingSynthesizer enhances resource isolation and cleanup control by creating separate staging resources for each application, reducing the risk of conflicts between resources and providing more granular management. It also enables better asset lifecycle management and customization of roles and resource handling, offering CDK developers a flexible and organized approach to resource deployment. Let’s delve into some of the advantages and key features of this library.

Advantages and Outcomes:

  1. Isolation and Access Control: The resources created for Project A are now completely isolated from Project B. Project B doesn’t have visibility or access to the staging resources of Project A, and vice versa. This ensures a higher level of data and resource security.
  2. Easier Resource Cleanup: When cleaning up or deleting resources, the Staging Stack specific to each project can be removed independently. This allows for a more streamlined and controlled cleanup process, mitigating the risk of inadvertently affecting other projects.
  3. Lifecycle Management: With separate ECR repositories for each CDK application, the company can apply lifecycle rules independently for retention and cost management. For example, they can configure each ECR repository to retain only the most recent 5 images, effectively cutting down on storage costs.
  4. Reduced Bootstrapping Complexity: As the only shared resources required are global Roles, the company now only needs to bootstrap every account in one Region instead of bootstrapping every Region. This simplifies the bootstrapping process, making it easier to manage with CloudFormation StackSets.

Key Features of the App Staging Synthesizer:

  • IStagingResources Interface: The App Staging Synthesizer introduces the IStagingResources interface, offering a framework to manage app-level bootstrap stacks. These stacks handle file assets and Docker assets for CDK applications.
  • DefaultStagingStack: Included in the library, the DefaultStagingStack is a pre-built implementation of the IStagingResources It comes with default configurations for staging resources, making it easier to get started.
  • AppStagingSynthesizer: This is a new CDK synthesizer that orchestrates the creation of staging resources for each CDK application. It seamlessly integrates with the application deployment process.
  • Deployment Roles: In addition to creating staging resources, the CDK App Staging Synthesizer also manages deployment roles. These roles are crucial for secure and controlled resource deployment, ensuring that only authorized processes can modify or access the resources.

 Implementation:

Let’s explore practical examples of using the App Staging Synthesizer within a CDK application.

Prerequisite:

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Install AWS CDK version 2.73.0 or later
  • A basic understanding of CDK. Please go through cdkworkshop.com to get hands-on learning about CDK and related concepts.
  • NOTE: To utilize the AppStagingSynthesizer, you should have an existing CDK application or should be working on a CDK application.

Using Default Staging Resources:

When configuring your CDK application to use deployment identities with the old bootstrap stack, it’s important to note that the existing staging resources, including the global S3 bucket and ECR repository, will still be created as part of the bootstrapping process. However, they will remain unused by this specific application, thanks to the App Staging Synthesizer.
While we won’t delve into the removal of these unused resources in this blogpost, it’s worth mentioning that for a more streamlined resource setup, you have the option to customize the bootstrap template to remove these resources if desired. This can help reduce clutter and ensure that only the necessary resources are retained within your CDK environment.

To get started, update your CDK App with the following code snippet:

const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.defaultResources({
appId: 'my-app-id',
// The following line is optional. By default, it is assumed you have bootstrapped in the same region(s) as the stack(s) you are deploying.
deploymentIdentities: DeploymentIdentities.defaultBootstrapRoles({ bootstrapRegion: 'us-east-1' }),
}),
});

This code snippet creates a DefaultStagingStack for a CDK App, allowing you to manage staging resources more effectively.

Customizing Roles:

You can customize roles for the synthesizer, which can be useful for several reasons such as:

  • Reuse of existing roles: In many AWS environments, organizations have existing IAM roles with specific permissions and policies that are aligned with their security and compliance requirements. Rather than creating new roles from scratch, you might want to leverage these existing roles to maintain consistency and adhere to established security practices.
  • Compatibility: In scenarios where you have pre-existing IAM roles that are being used across various AWS services or applications, customizing roles within the CDK App Staging Synthesizer allows you to seamlessly integrate CDK deployments into your existing IAM role management strategy.

Overall, customizing roles provides flexibility and control over resources used during CDK application deployments, enabling you to align CDK-based infrastructure with the organization’s policies. An example is:

const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.defaultResources({
appId: 'my-app-id',
deploymentIdentities: DeploymentIdentities.specifyRoles({
cloudFormationExecutionRole: BootstrapRole.fromRoleArn('arn:aws:iam::123456789012:role/Execute'),
deploymentRole: BootstrapRole.fromRoleArn('arn:aws:iam::123456789012:role/Deploy'),
}),
}),
});

This code snippet illustrates how you can specify custom roles for different stages of the deployment process.

Deploy Time S3 Assets:

Deploy-time S3 assets can be classified into two categories, each serving a distinct purpose:

  • Assets Used Only During Deployment: These assets are instrumental in handing off substantial data to other services for private copying during deployment. They play a vital role during initial deployment, and afterwards are retained solely for potential future rollbacks
  • Assets Accessed Throughout Application Lifespan: In contrast, some assets are accessed continuously throughout the runtime of your application. These could include script files utilized in CodeBuild projects, startup scripts for EC2 instances, or, in the case of CDK applications, ECR images that persist throughout the application’s life.

Marking Lambda Assets as Deploy-Time:

By default, Lambda assets are marked as deploy-time assets in the CDK App Staging Synthesizer. This means they fall into the first category mentioned above, serving as essential components during deployment. For instance, consider the following code snippet:

declare const stack: Stack;
new lambda.Function(stack, 'lambda', {
code: lambda.AssetCode.fromAsset(path.join(__dirname, 'assets')), // Lambda code bundle marked as deploy-time
handler: 'index.handler',
runtime: lambda.Runtime.PYTHON_3_9,
});

In this example, the Lambda code bundle is automatically identified as a deploy-time asset. This distinction ensures that it’s cleaned up after the configurable rollback window.

Creating Custom Deploy-Time Assets:

CDK offers the flexibility needed to create custom deploy-time assets. This can be achieved by utilizing the Asset construct from the AWS CDK library:

import { Asset } from 'aws-cdk-lib/aws-s3-assets';
declare const stack: Stack;
const asset = new Asset(stack, 'deploy-time-asset', {
deployTime: true, // Marking the asset as deploy-time
path: path.join(__dirname, './deploy-time-asset'),
});

By setting deployTime to true, the asset is explicitly marked as deploy-time. This allows you to maintain control over the lifecycle of these assets, ensuring they are retained for as long as needed. However, it is important to note that deploy-time assets eventually become eligible for cleanup.

Configuring Asset Lifecycles:
By default, the CDK retains deploy-time assets for a period of 30 days. However, there is flexibility to adjust this duration according to custom requirements. This can be achieved by specifying deployTimeFileAssetLifetime. The value set here determines how long you can roll back to a previous application version without the need for rebuilding and republishing assets:

const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.defaultResources({
appId: 'my-app-id',
deployTimeFileAssetLifetime: Duration.days(100), // Adjusting the asset retention period to 100 days
}),
});

By fine-tuning the lifecycle of deploy-time S3 assets, you gain more control over CDK deployments and ensure that CDK applications are equipped to handle rollbacks and updates with ease.

Optimizing ECR Repository Management with Lifecycle Rules:

The AWS CDK App Staging Synthesizer provides you with the capability to control the lifecycle of container images by leveraging lifecycle rules within ECR repositories. Let’s explore how this feature can help streamline your CDK workflows.

ECR repositories can accumulate numerous versions of Docker images over time. While retaining some historical versions is essential for rollback scenarios and reference, an unregulated growth of image versions can lead to increased storage costs and management complexity.

The AWS CDK App Staging Synthesizer offers a default configuration that stores a maximum of 3 revisions for a given Docker image asset. This ensures that you maintain access to previous image versions, facilitating seamless rollback operations. When more than 3 revisions of an asset exist in the ECR repository, the oldest one is purged.

Although by default, it’s set to 3, you can also adjust this value using the imageAssetVersionCount property:

const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.defaultResources({
appId: 'my-app-id',
imageAssetVersionCount: 10, // Customizing the image version count to retain up to 10 revisions
}),
});

By increasing or decreasing the imageAssetVersionCount, you can strike a balance between storage efficiency and the need to access historical image versions. This ensures that ECR repositories are optimized to the CDK application’s requirements.

Streamlining Cleanup: Auto Delete Staging Assets on Stack Deletion

Efficiently managing resources throughout the lifecycle of your CDK applications is essential, and this includes handling the cleanup of staging assets when stacks are deleted. The AWS CDK App Staging Synthesizer simplifies this process by providing an auto-delete feature for staging resources. In this section, we’ll explore how this feature works and how you can customize it according to your needs.

The Default Cleanup Behavior:
By default, the AWS CDK App Staging Synthesizer is designed to facilitate the cleanup of staging resources automatically when a stack is deleted. This means that associated resources, such as S3 buckets and ECR repositories, are configured with a RemovalPolicy.DESTROY and have autoDeleteObjects (for S3 buckets) or autoDeleteImages (for ECR repositories) turned on. Under the hood, custom resources are created to ensure a seamless cleanup process.

Customizing Cleanup Behavior:
While automatic cleanup is convenient for many scenarios, there may be situations where you want to retain staging resources even after stack deletion. This can be useful when you intend to reuse these resources or when you have specific cleanup processes outside of the default behavior. To retain staging assets and disable the auto-delete feature, you can specify autoDeleteStagingAssets: as false when configuring the AWS CDK App Staging Synthesizer:

const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.defaultResources({
appId: 'my-app-id',
autoDeleteStagingAssets: false, // Disabling auto-delete of staging assets
}),
});

By setting autoDeleteStagingAssets to false, you have full control over the cleanup of staging resources. This allows you to retain and manage these resources independently, giving you the flexibility to align CDK workflows with the organization’s specific practices.

Using an Existing Staging Stack:

While the AWS CDK App Staging Synthesizer offers powerful tools for managing staging resources, there may be scenarios where you already have a meticulously crafted staging stack in place. In such cases, you can seamlessly integrate the existing stack with the AppStagingSynthesizer using the customResources() method. Let’s explore how you can make the most of your pre-existing staging infrastructure.

The process is straightforward—supply your existing staging stack as a resource to the AppStagingSynthesizer using the customResources() method. It’s crucial to ensure that the custom stack adheres to the requirements of the IStagingResources interface for smooth integration.

Here’s an example:

// Create a new CDK App
const resourceApp = new App();

//Instantiate your custom staging stack (make sure it implements IstagingResources)
const resources = new CustomStagingStack(resourceApp, 'CustomStagingStack', {});

//Configure your CDK App to use the App Staging Synthesizer with your custom staging stack
const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.customResources({
resources,
}),
});

In this example, CustomStagingStack represents the pre-existing staging infrastructure. By providing it as a resource to the App Staging Synthesizer, you seamlessly integrate it into the CDK application’s deployment workflow.

Crafting Custom Staging Stacks for Environment Control:

For those seeking precise control over resource management in different environments, the AWS CDK App Staging Synthesizer offers a robust solution – custom staging stacks. This feature allows you to tailor resource configurations, permissions, and behaviors to meet the unique demands of each environment within the CDK application.

Subclassing DefaultStagingStack for a Quick Start:

If your customization requirements align with the available properties, you can start by subclassing DefaultStagingStack. This streamlined approach lets you inherit existing functionalities while tweaking specific behaviors as needed. Here’s how you can dive right in:

//Define custom staging stack
interface CustomStagingStackOptions extends DefaultStagingStackOptions {}

//Subclass DefaultStagingStack to create the custom stgaing stack
class CustomStagingStack extends DefaultStagingStack {
// Implement customizations here
}

Building Staging Resources from Scratch:

For more granular control, consider building the staging resources entirely from scratch. This approach allows you to define every aspect of the staging stack, from the ground up, by implementing the “IStagingResources” interface. Here’s an example:

// Define custom staging stack properties(if needed)
interface CustomStagingStackProps extends StackProps {}

//Create your custom staging stack that implements IStagingResources
class CustomStagingStack extends Stack implements IStagingResources {
constructor(scope: Construct, id: string, props: CustomStagingStackProps) {
super(scope, id, props);
}

// Implement methods to define your custom staging resources
public addFile(asset: FileAssetSource): FileStagingLocation {
return {
bucketName: 'myBucket',
assumeRoleArn: 'myArn',
dependencyStack: this,
};
}
public addDockerImage(asset: DockerImageAssetSource): ImageStagingLocation {
return {
repoName: 'myRepo',
assumeRoleArn: 'myArn',
dependencyStack: this,
};
}
}

Creating Custom Staging Resources:

Implementing custom staging resources also involves crafting a CustomFactory class to facilitate the creation of these resources in every environment where your CDK App is deployed. This approach offers a high level of customization while ensuring consistency across deployments. Here’s how it works:

// Define a custom factory for your staging resources
class CustomFactory implements IStagingResourcesFactory {
public obtainStagingResources(stack: Stack, context: ObtainStagingResourcesContext) {
const myApp = App.of(stack);

// Create a custom staging stack instance for the current environment
return new CustomStagingStack(myApp!, `CustomStagingStack-${context.environmentString}`, {});
}
}

//Incorporate your custom staging resources into the Application using the customer factory
const app = new App({
defaultStackSynthesizer: AppStagingSynthesizer.customFactory({
factory: new CustomFactory(),
oncePerEnv: true, // by default
}),
});

With this setup, you can create custom staging stacks for each environment, ensuring resource management tailored to your specific needs. Whether you choose to subclass DefaultStagingStack for a quick start or build resources from scratch, custom staging stacks empower you to achieve fine-grained control and consistency across CDK deployments.

Conclusion:

The App Staging Synthesizer introduces a powerful approach to managing staging resources in AWS CDK applications. With enhanced resource isolation and lifecycle control, it addresses the limitations of the default bootstrapping model. By integrating the App Staging Synthesizer into CDK applications, you can achieve better resource management, cleaner cleanup processes, and more control over cloud infrastructure.
Explore this experimental library and unleash the potential of fine-tuned resource management in CDK projects.

For more information and code examples, refer to the official documentation provided by AWS.

About the Authors:

Jehu Gray

Jehu Gray is an Enterprise Solutions Architect at Amazon Web Services where he helps customers design solutions that fits their needs. He enjoys exploring what’s possible with IaC.

Abiola Olanrewaju

Abiola Olanrewaju is an Enterprise Solutions Architect at Amazon Web Services where he helps customers design and implement scalable solutions that drive business outcomes. He has a keen interest in Data Analytics, Security and Automation.

Use AWS Secrets Manager to store and manage secrets in on-premises or multicloud workloads

Post Syndicated from Sreedar Radhakrishnan original https://aws.amazon.com/blogs/security/use-aws-secrets-manager-to-store-and-manage-secrets-in-on-premises-or-multicloud-workloads/

AWS Secrets Manager helps you manage, retrieve, and rotate database credentials, API keys, and other secrets throughout their lifecycles. You might already use Secrets Manager to store and manage secrets in your applications built on Amazon Web Services (AWS), but what about secrets for applications that are hosted in your on-premises data center, or hosted by another cloud service provider? You might even be in the process of moving applications out of your data center as part of a phased migration, where the application is partially in AWS, but other components still remain in your data center until the migration is complete. In this blog post, we’ll describe the potential benefits of using Secrets Manager for workloads outside AWS, outline some recommended practices for using Secrets Manager for hybrid workloads, and provide a basic sample application to highlight how to securely authenticate and retrieve secrets from Secrets Manager in a multicloud workload.

In order to make an API call to retrieve secrets from Secrets Manager, you need IAM credentials. While it is possible to use an AWS Identity and Access Management (IAM) user, AWS recommends using temporary, or short-lived, credentials wherever possible to reduce the scope of impact of an exposed credential. This means we will allow our hybrid application to assume an IAM role in this example. We’ll use IAM Roles Anywhere to provide a mechanism for our applications outside AWS to assume an IAM Role based on a trust configured with our Certificate Authority (CA).

IAM Roles Anywhere offers a solution for on-premises or multicloud applications to acquire temporary AWS credentials, helping to eliminate the necessity for creating and handling long-term AWS credentials. This removal of long-term credentials enhances security and streamlines the operational process by reducing the burden of managing and rotating the credentials.

In this post, we assume that you have a basic understanding of IAM. For more information on IAM roles, see the IAM documentation. We’ll start by examining some potential use cases at a high level, and then we’ll highlight recommended practices to securely fetch secrets from Secrets Manager from your on-premises or hybrid workload. Finally, we’ll walk you through a simple application example to demonstrate how to put these recommendations together in a workload.

Selected use cases for accessing secrets from outside AWS

Following are some example scenarios where it may be necessary to securely retrieve or manage secrets from outside AWS, such from applications hosted in your data center, or another cloud provider.

Centralize secrets management for applications in your data center and in AWS

It’s beneficial to offer your application teams a single, centralized environment for managing secrets. This can simplify managing secrets because application teams are only required to understand and use a single set of APIs to create, retrieve, and rotate secrets. It also provides consistent visibility into the secrets used across your organization because Secrets Manager is integrated with AWS CloudTrail to log API calls to the service, including calls to retrieve or modify a secret value.

In scenarios where your application is deployed either on-premises or in a multicloud environment, and your database resides in Amazon Relational Database Service (Amazon RDS), you have the opportunity to use both IAM Roles Anywhere and Secrets Manager to store and retrieve secrets by using short-term credentials. This approach allows central security teams to have confidence in the management of credentials and builder teams to have a well-defined pattern for credential management. Note that you can also choose to configure IAM database authentication with RDS, instead of storing database credentials in Secrets Manager, if this is supported by your database environment.

Hybrid or multicloud workloads

At AWS, we’ve generally seen that customers get the best experience, performance, and pricing when they choose a primary cloud provider. However, for a variety of reasons, some customers end up in a situation where they’re running IT operations in a multicloud environment. In these scenarios, you might have hybrid applications that run in multiple cloud environments, or you might have data stored in AWS that needs to be accessed from a different application or workload running in another cloud provider. You can use IAM Roles Anywhere to securely access or manage secrets in Secrets Manager for these use cases.

Phased application migrations to AWS

Consider a situation where you are migrating a monolithic application to AWS from your data center, but the migration is planned to take place in phases over a number of months. You might be migrating your compute into AWS well before your databases, or vice versa. In this scenario, you can use Secrets Manager to store your application secrets and access them from both on premises and in AWS. Because your secrets are accessible from both on premises and AWS through the same APIs, you won’t need to refactor your application to retrieve these secrets as the migration proceeds.

Recommended practices for retrieving secrets for hybrid and multicloud workloads

In this section, we’ll outline some recommended practices that will help you provide least-privilege access to your application secrets, wherever the access is coming from.

Client-side caching of secrets

Client-side caching of secrets stored in Secrets Manager can help you improve performance and decrease costs by reducing the number of API requests to Secrets Manager. After retrieving a secret from Secrets Manager, your application can get the secret value from its in-memory cache without making further API calls. The cached secret value is automatically refreshed after a configurable time interval, called the cache duration, to help ensure that the application is always using the latest secret value. AWS provides client-side caching libraries for .NET, Java, JDBC, Python, and Go to enable client-side caching. You can find more detailed information on client-side caching specific to Python libraries in this blog post.

Consider a hybrid application with an application server on premises, that needs to retrieve database credentials stored in Secrets Manager in order to query customer information from a database. Because the API calls to retrieve the secret are coming from outside AWS, they may incur increased latency simply based on the physical distance from the closest AWS data center. In this scenario, the performance gains from client-side caching become even more impactful.

Enforce least-privilege access to secrets through IAM policies

You can use a combination of IAM policy types to granularly restrict access to application secrets when you’re using IAM Roles Anywhere and Secrets Manager. You can use conditions in trust policies to control which systems can assume the role. In our example, this is based on the system’s certificate, meaning that you need to appropriately control access to these certificates. We use a policy condition to specify an IP address in our example, but you could also use a range of IP addresses. Other examples would be conditions that specify a time range for when resources can be accessed, conditions that allow or deny building resources in certain AWS Regions, and more. You can find example policies in the IAM documentation.

You should use identity policies to provide Secrets Manager with permissions to the IAM role being assumed, following the principle of least privilege. You can find IAM policy examples for Secrets Manager use cases in the Secrets Manager documentation.

By combining different policy types, like identity policies and trust policies, you can limit the scope of systems that can assume a role, and control what those systems can do after assuming a role. For example, in the trust policy for the IAM role with access to the secret in Secrets Manager, you can allow or deny access based on the common name of the certificate that’s being used to authenticate and retrieve temporary credentials in order to assume a role using IAM Roles Anywhere. You can then attach an identity policy to the role being assumed that provides only the necessary API actions for your application, such as the ability to retrieve a secret value—but not to a delete a secret. See this blogpost for more information on when to use different policy types.

Transform long-term secrets into short-term secrets

You may already be wondering, “why should I use short-lived credentials to access a long-term secret?” Frequently rotating your application secrets in Secrets Manager will reduce the impact radius of a compromised secret. Imagine that you rotate your application secret every day. If that secret is somehow publicly exposed, it will only be usable for a single day (or less). This can greatly reduce the risk of compromised credentials being used to get access to sensitive information. You can find more information about the value of using short-lived credentials in this AWS Well-Architected best practice.

Instead of using static database credentials that are rarely (or never) rotated, you can use Secrets Manager to automatically rotate secrets up to every four hours. This method better aligns the lifetime of your database secret with the lifetime of the short-lived credentials that are used to assume the IAM role by using IAM Roles Anywhere.

Sample workload: How to retrieve a secret to query an Amazon RDS database from a workload running in another cloud provider.

Now we’ll demonstrate examples of the recommended practices we outlined earlier, such as scoping permissions with IAM policies. We’ll also showcase a sample application that uses a virtual machine (VM) hosted in another cloud provider to access a secret in Secrets Manager.

The reference architecture in Figure 1 shows the basic sample application.

Figure 1: Application connecting to Secrets Manager by using IAM Roles Anywhere to retrieve RDS credentials

Figure 1: Application connecting to Secrets Manager by using IAM Roles Anywhere to retrieve RDS credentials

In the sample application, an application secret (for example, a database username and password) is being used to access an Amazon RDS database from an application server hosted in another cloud provider. The following process is used to connect to Secrets Manager in order to retrieve and use the secret:

  1. The application server makes a request to retrieve temporary credentials by using IAM Roles Anywhere.
  2. IAM validates the request against the relevant IAM policies and verifies that the certificate was issued by a CA configured as a trust anchor.
  3. If the request is valid, AWS Security Token Service (AWS STS) provides temporary credentials that the application can use to assume an IAM role.
  4. IAM Roles Anywhere returns temporary credentials to the application.
  5. The application assumes an IAM role with Secrets Manager permissions and makes a GetSecretValue API call to Secrets Manager.
  6. The application uses the returned database credentials from Secrets Manager to query the RDS database and retrieve the data it needs to process.

Configure IAM Roles Anywhere

Before you configure IAM Roles Anywhere, it’s essential to have an IAM role created with the required permission for Amazon RDS and Secrets Manager. If you’re following along on your own with these instructions, refer to this blog post and the IAM Roles Anywhere User Guide for the steps to configure IAM Roles Anywhere in your environment.

Obtain temporary security credentials

You have several options to obtain temporary security credentials using IAM Roles Anywhere:

  • Using the credential helper — The IAM Roles Anywhere credential helper is a tool that manages the process of signing the CreateSession API with the private key associated with an X.509 end-entity certificate and calls the endpoint to obtain temporary AWS credentials. It returns the credentials to the calling process in a standard JSON format. This approach is documented in the IAM Roles Anywhere User Guide.
  • Using the AWS SDKs

Use policy controls to appropriately scope access to secrets

In this section, we demonstrate the process of restricting access to temporary credentials by employing condition statements based on attributes extracted from the X.509 certificate. This additional step gives you granular control of the trust policy, so that you can effectively manage which resources can obtain credentials from IAM Roles Anywhere. For more information on establishing a robust data perimeter on AWS, refer to this blog post.

Prerequisites

  • IAM Roles Anywhere using AWS Private Certificate Authority or your own PKI as the trust anchor
  • IAM Roles Anywhere profile
  • An IAM role with Secrets Manager permissions

Restrict access to temporary credentials

You can restrict access to temporary credentials by using specific PKI conditions in your role’s trust policy, as follows:

  • Sessions issued by IAM Roles Anywhere have the source identity set to the common name (CN) of the subject you use in end-entity certificate authenticating to the target role.
  • IAM Roles Anywhere extracts values from the subject, issuer, and Subject Alternative Name (SAN) fields of the authenticating certificate and makes them available for policy evaluation through the sourceIdentity and principal tags.
  • To examine the contents of a certificate, use the following command:

    openssl x509 -text -noout -in certificate.pem

  • To establish a trust relationship for IAM Roles Anywhere, use the following steps:
    1. In the navigation pane of the IAM console, choose Roles.
    2. The console displays the roles for your account. Choose the name of the role that you want to modify, and then choose the Trust relationships tab on the details page.
    3. Choose Edit trust relationship.

Example: Restrict access to a role based on the common name of the certificate

The following example shows a trust policy that adds a condition based on the Subject Common Name (CN) of the certificate.

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "rolesanywhere.amazonaws.com"
        },
        "Action": [
          "sts:AssumeRole",
          "sts:TagSession",
          "sts:SetSourceIdentity"
        ],
        "Condition": {
          "StringEquals": {
            "aws:PrincipalTag/x509Subject/CN": "workload-a.iamcr-test"
          },
          "ArnEquals": {
            "aws:SourceArn": [
              "arn:aws:rolesanywhere:region:account:trust-anchor/TA_ID"
            ]
          }
        }
      }
    ]
  }

If you try to access the temporary credentials using a different certificate which has a different CN, you will receive the error “Error when retrieving credentials from custom-process: 2023/07/0X 23:46:43 AccessDeniedException: Unable to assume role for arn:aws:iam::64687XXX207:role/RDS_SM_Role”.

Example: Restrict access to a role based on the issuer common name

The following example shows a trust policy that adds a condition based on the Issuer CN of the certificate.

 {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "rolesanywhere.amazonaws.com"
        },
        "Action": [
          "sts:AssumeRole",
          "sts:TagSession",
          "sts:SetSourceIdentity"
        ],
        "Condition": {
          "StringEquals": {
            "aws:PrincipalTag/x509Issuer/CN": "iamcr.test"
          },
          "ArnEquals": {
            "aws:SourceArn": [
              "arn:aws:rolesanywhere:region:account:trust-anchor/TA_ID"
            ]
          }
        }
      }
    ]
  }

Example: Restrict access to a role based on the subject alternative name (SAN)

The following example shows a trust policy that adds a condition based on the SAN fields of the certificate.

 {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "rolesanywhere.amazonaws.com"
      },
      "Action": [
        "sts:AssumeRole",
        "sts:TagSession",
        "sts:SetSourceIdentity"
      ],
      "Condition": {
        "StringEquals": {
          "aws:PrincipalTag/x509SAN/DNS": "workload-a.iamcr.test"
        },
        "ArnEquals": {
          "aws:SourceArn": [
            "arn:aws:rolesanywhere:region:account:trust-anchor/TA_ID"
          ]
        }
      }
    }
  ]
}

Session policies

Define session policies to further scope down the sessions delivered by IAM Roles Anywhere. Here, for demonstration purposes, we added an inline policy to only allow requests coming from the specified IP address by using the following steps.

  1. Navigate to the Roles Anywhere console.
  2. Under Profiles, choose Create a profile.
  3. On the Create a profile page, enter a name for the profile.
  4. For Roles, select the role that you created in the previous step, and select the Inline policy.

The following example shows how to allow only the requests from a specific IP address. You will need to replace <X.X.X.X/32> in the policy example with your own IP address.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Deny",
      "Action": "*",
      "Resource": "*",
      "Condition": {
        "NotIpAddress": {
          "aws:SourceIp": [
            "<X.X.X.X/32>"
          ]
        }
      }
    },
    {
      "Effect": "Allow",
      "Action": "*",
      "Resource": "*"
    }
  ]
}

Retrieve secrets securely from a workload running in another cloud environment

In this section, we’ll demonstrate the process of connecting virtual machines (VMs) running in another cloud provider to an Amazon RDS MySQL database, where the database credentials are securely stored in Secrets Manager.

Create a database and manage Amazon RDS master database credentials in Secrets Manager

In this section, you will create a database instance and use Secrets Manager to manage the master database credentials.

To create an Amazon RDS database and manage master database credentials in Secrets Manager

  1. Open the Amazon RDS console and choose Create database.
  2. Select your preferred database creation method. For this post, we chose Standard create.
  3. Under Engine options, for Engine type, choose your preferred database engine. In this post, we use MySQL.
  4. Under Settings, for Credentials Settings, select Manage master credentials in AWS Secrets Manager.
    Figure 2: Manage master credentials in Secrets Manager

    Figure 2: Manage master credentials in Secrets Manager

  5. You have the option to encrypt the managed master database credentials. In this example, we will use the default AWS KMS key.
  6. (Optional) Choose other settings to meet your requirements. For more information, see Settings for DB instances.
  7. Choose Create Database, and wait a few minutes for the database to be created.

Retrieve and use temporary credentials to access a secret in Secrets Manager

The next step is to use the AWS Roles Anywhere service to obtain temporary credentials for an IAM role. These temporary credentials are essential for accessing AWS resources securely. Earlier, we described the options available to you to retrieve temporary credentials by using IAM Roles Anywhere. In this section, we will assume you’re using the credential helper to retrieve temporary credentials and make an API call to Secrets Manager.

After you retrieve temporary credentials and assume an IAM role with permissions to access the secret in Secrets Manager, you can run a simple script on the VM to get the database username and password from Secrets Manager and update the database. The steps are summarized here:

  • Use the credential helper to assume your IAM role with permissions to access the secret in Secrets Manager. You can find instructions to obtain temporary credentials in the IAM Roles Anywhere User Guide.
  • Retrieve secrets from Secrets Manager. Using the obtained temporary credentials, you can create a boto3 session object and initialize a secrets_client from boto3.client(‘secretsmanager’). The secrets_client is responsible for interacting with the Secrets Manager service. You will retrieve the secret value from Secrets Manager, which contains the necessary credentials (for example, database username and password) for accessing an RDS database.
  • Establish a connection to the RDS database. The retrieved secret value is parsed, extracting the database connection information. You can then establish a connection to the RDS database using the extracted details, such as username and password.
  • Perform database operations. Once the database connection is established, the script performs the operation to update a record in the database.

The following is an example Python script to retrieve credentials from Secrets Manager and connect to the RDS for database operations.

import mysql.connector
import boto3
import json

#Create client

client = boto3.client('secretsmanager')
response = client.get_secret_value(
    SecretId='rds!db-fXXb-11ce-4f05-9XX2-d42XXcd'
)
secretDict = json.loads(response['SecretString'])

#Connect to DB

mydb = mysql.connector.connect(
    host="rdsmysql.cpl0ov.us-east-1.rds.amazonaws.com",
    user=secretDict['username'],
    password=secretDict['password'],
    database="rds_mysql"
)
mycursor = mydb.cursor()

#Update DB 

sql = "INSERT INTO employees (id, name) VALUES (%s, %s)"
val = (12, "AWPS")
mycursor.execute(sql, val)
mydb.commit()
print(mycursor.rowcount, "record inserted.")

And that’s it! You’ve retrieved temporary credentials using IAM Roles Anywhere, assumed a role with permissions to access the database username and password in Secrets Manager, and then retrieved and used the database credentials to update a database from your application running on another cloud provider. This is a simple example application for the purpose of the blog post, but the same concepts will apply in real-world use cases.

Conclusion

In this post, we’ve demonstrated how you can securely store, retrieve, and manage application secrets and database credentials for your hybrid and multicloud workloads using Secrets Manager. We also outlined some recommended practices for least-privilege access to your secrets when accessing Secrets Manager from outside AWS by using IAM Roles Anywhere. Lastly, we demonstrated a simple example of using IAM Roles Anywhere to assume a role, then retrieve and use database credentials from Secrets Manager in a multicloud workload. To get started managing secrets, open the Secrets Manager console. To learn more about Secrets Manager, refer to the Secrets Manager documentation.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Sreedar Radhakrishnan

Sreedar Radhakrishnan

Sreedar is a Senior Solutions Architect at AWS, where he helps enterprise customers to design and build secure, scalable, and sustainable solutions on AWS. In his spare time, Sreedar enjoys playing badminton and spending time with his family.

Zach Miller

Zach Miller

Zach is a Senior Security Specialist Solutions Architect at AWS. His background is in data protection and security architecture, focused on a variety of security domains, including cryptography, secrets management, and data classification. Today, he is focused on helping enterprise AWS customers adopt and operationalize AWS security services to increase security effectiveness and reduce risk.

Akshay Aggarwal

Akshay Aggarwal

Akshay is a Senior Technical Product Manager on the AWS Secrets Manager team. As part of AWS Cryptography, Akshay drives technologies and defines best practices that help improve customer’s experience of building secure, reliable workloads in the AWS Cloud. Akshay is passionate about building technologies that are easy to use, secure, and scalable.

Automate legacy ETL conversion to AWS Glue using Cognizant Data and Intelligence Toolkit (CDIT) – ETL Conversion Tool

Post Syndicated from Deepak Singh original https://aws.amazon.com/blogs/big-data/automate-legacy-etl-conversion-to-aws-glue-using-cognizant-data-and-intelligence-toolkit-cdit-etl-conversion-tool/

This blog post is co-written with Govind Mohan and Kausik Dhar from Cognizant. 

Migrating on-premises data warehouses to the cloud is no longer viewed as an option but a necessity for companies to save cost and take advantage of what the latest technology has to offer. Although we have seen a lot of focus toward migrating data from legacy data warehouses to the cloud and multiple tools to support this initiative, data is only part of the journey. Successful migration of legacy extract, transform, and load (ETL) processes that acquire, enrich, and transform the data plays a key role in the success of any end-to-end data warehouse migration to the cloud.

The traditional approach of manually rewriting a large number of ETL processes to cloud-native technologies like AWS Glue is time consuming and can be prone to human error. Cognizant Data & Intelligence Toolkit (CDIT) – ETL Conversion Tool automates this process, bringing in more predictability and accuracy, eliminating the risk associated with manual conversion, and providing faster time to market for customers.

Cognizant is an AWS Premier Tier Services Partner with several AWS Competencies. With its industry-based, consultative approach, Cognizant helps clients envision, build, and run more innovative and efficient businesses.

In this post, we describe how Cognizant’s Data & Intelligence Toolkit (CDIT)- ETL Conversion Tool can help you automatically convert legacy ETL code to AWS Glue quickly and effectively. We also describe the main steps involved, the supported features, and their benefits.

Solution overview

Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool automates conversion of ETL pipelines and orchestration code from legacy tools to AWS Glue and AWS Step Functions and eliminates the manual processes involved in a customer’s ETL cloud migration journey.

It comes with an intuitive user interface (UI). You can use these accelerators by selecting the source and target ETL tool for conversion and then uploading an XML file of the ETL mapping to be converted as input.

The tool also supports continuous monitoring of the overall progress, and alerting mechanisms are in place in the event of any failures, errors, or operational issues.

Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool internally uses many native AWS services, such as Amazon Simple Storage Service (Amazon S3) and Amazon Relational Database Service (Amazon RDS) for storage and metadata management; Amazon Elastic Compute Cloud (Amazon EC2) and AWS Lambda for processing; Amazon CloudWatch, AWS Key Management Service (AWS KMS), and AWS IAM Identity Center (successor to AWS Single Sign-On) for monitoring and security; and AWS CloudFormation for infrastructure management. The following diagram illustrates this architecture.

How to use CDIT: ETL Conversion Tool for ETL migration.

Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool supports the following legacy ETL tools as source and supports generating corresponding AWS Glue ETL scripts in both Python and Scala:

  • Informatica
  • DataStage
  • SSIS
  • Talend

Let’s look at the migration steps in more detail.

Assess the legacy ETL process

Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool enables you to assess in bulk the potential automation percentage and complexity of a set of ETL jobs and workflows that are in scope for migration to AWS Glue. The assessment option helps you understand what kind of saving can be achieved using Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool, the complexity of the ETL mappings, and the extent of manual conversion needed, if any. You can upload a single ETL mapping or a folder containing multiple ETL mappings as input for assessment and generate an assessment report, as shown in the following figure.

Convert the ETL code to AWS Glue

To convert legacy ETL code, you upload the XML file of the ETL mapping as input to the tool. User inputs are stored in the internal metadata repository of the tool and Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool parses these XML input files and breaks them down to a patented canonical model, which is then forward engineered into the target AWS Glue scripts in Python or Scala. The following screenshot shows an example of the Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool GUI and Output Console pane.

If any part of the input ETL job couldn’t be converted completely to the equivalent AWS Glue script, it’s tagged between comment lines in the output so that it can be manually fixed.

Convert the workflow to Step Functions

The next logical step after converting the legacy ETL jobs is to orchestrate the run of these jobs in the logical order. Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool lets you automate the conversion of on-premises ETL workflows by converting them to corresponding Step Functions workflows. The following figure illustrates a sample input Informatica workflow.

Workflow conversion follows the similar pattern as that of the ETL mapping. XML files for ETL workflows are uploaded as input and Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool it generates the equivalent Step Functions JSON file based on the input XML file data.

Benefits of using Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool

The following are the key benefits of using Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool to automate legacy ETL conversion:

  • Cost reduction – You can reduce the overall migration effort by as much as 80% by automating the conversion of ETL and workflows to AWS Glue and Step Functions
  • Better planning and implementation – You can assess the ETL scope and determine automation percentage, complexity, and unsupported patterns before the start of the project, resulting in accurate estimation and timelines
  • Completeness – Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool offers one solution with support for multiple legacy ETL tools like Informatica, DataStage, Talend, and more.
  • Improved customer experience – You can achieve migration goals seamlessly without errors caused by manual conversion and with high automation percentage

Case study: Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool proposed implementation

A large US-based insurance and annuities company wanted to migrate their legacy ETL process in Informatica to AWS Glue as part of their cloud migration strategy.

As part of this engagement, Cognizant helped the customer successfully migrate their Informatica based data acquisition and integration ETL jobs and workflows to AWS. A proof of concept (PoC) using Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool was completed first to showcase and validate automation capabilities.

Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool was used to automate the conversion of over 300 Informatica mappings and workflows to equivalent AWS Glue jobs and Step Functions workflows, respectively. As a result, the customer was able to migrate all legacy ETL code to AWS as planned and retire the legacy application.

The following are key highlights from this engagement:

  • Migration of over 300 legacy Informatica ETL jobs to AWS Glue
  • Automated conversion of over 6,000 transformations from legacy ETL to AWS Glue
  • 85% automation achieved using CDIT: ETL Conversion Tool
  • The customer saved licensing fees and retired their legacy application as planned

Conclusion

In this post, we discussed how migrating legacy ETL processes to the cloud is critical to the success of a cloud migration journey. Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool enables you to perform an assessment of the existing ETL process to derive complexity and automation percentage for better estimation and planning. We also discussed the ETL technologies supported by Cognizant Data & Intelligence Toolkit (CDIT): ETL Conversion Tool and how ETL jobs can be converted to corresponding AWS Glue scripts. Lastly, we demonstrated how to use existing ETL workflows to automatically generate corresponding Step Functions orchestration jobs.

To learn more, please reach out to Cognizant.


About the Authors

Deepak Singh is a Senior Solutions Architect at Amazon Web Services with 20+ years of experience in Data & AIA. He enjoys working with AWS partners and customers on building scalable analytical solutions for their business outcomes. When not at work, he loves spending time with family or exploring new technologies in analytics and AI space.

Piyush Patra is a Partner Solutions Architect at Amazon Web Services where he supports partners with their Analytics journeys and is the global lead for strategic Data Estate Modernization and Migration partner programs.

Govind Mohan is an Associate Director with Cognizant with over 18 year of experience in data and analytics space, he has helped design and implement multiple large-scale data migration, application lift & shift and legacy modernization projects and works closely with customers in accelerating the cloud modernization journey leveraging Cognizant Data and Intelligence Toolkit (CDIT) platform.

Kausik Dhar is a technology leader having more than 23 years of IT experience – primarily focused on Data & Analytics, Data Modernization, Application Development, Delivery Management, and Solution Architecture. He has played a pivotal role in guiding clients through the designing and executing large-scale data and process migrations, in addition to spearheading successful cloud implementations. Kausik possesses expertise in formulating migration strategies for complex programs and adeptly constructing data lake/Lakehouse architecture employing a wide array of tools and technologies.

Query big data with resilience using Trino in Amazon EMR with Amazon EC2 Spot Instances for less cost

Post Syndicated from Ashwini Kumar original https://aws.amazon.com/blogs/big-data/query-big-data-with-resilience-using-trino-in-amazon-emr-with-amazon-ec2-spot-instances-for-less-cost/

Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. Amazon EMR provides a managed Hadoop framework that makes it straightforward, fast, and cost-effective to process vast amounts of data using EC2 instances. Amazon EMR with Spot Instances allows you to reduce costs for running your big data workloads on AWS. Amazon EC2 can interrupt Spot Instances with a 2-minute notification whenever Amazon EC2 needs to reclaim capacity for On-Demand customers. Spot Instances are best suited for running stateless and fault-tolerant big data applications such as Apache Spark with Amazon EMR, which are resilient against Spot node interruptions.

Trino (formerly PrestoSQL) is an open-source, highly parallel, distributed SQL query engine to run interactive queries as well as batch processing on petabytes of data. It can perform in-place, federated queries on data stored in a multitude of data sources, including relational databases (MySQL, PostgreSQL, and others), distributed data stores (Cassandra, MongoDB, Elasticsearch, and others), and Amazon Simple Storage Service (Amazon S3), without the need for complex and expensive processes of copying the data to a single location.

Before Project Tardigrade, Trino queries failed whenever any of the nodes in Trino clusters failed, and there was no automatic retry mechanism with iterative querying capability. Also, failed queries had to be restarted from scratch. Due to this limitation, the cost of failures of long-running extract, transform, and load (ETL) and batch queries on Trino was high in terms of completion time, compute wastage, and spend. Spot Instances were not appropriate for long-running queries with Trino clusters and only suited for short-lived Trino queries.

In October 2022, Amazon EMR announced a new capability in the Trino engine to detect 2-minute Spot interruption notifications and determine if the existing queries can complete within 2 minutes on those nodes. If the queries can’t finish, Trino will fail them quickly and retry the queries on different nodes. Also, Trino doesn’t schedule new queries on these Spot nodes, which are about to be reclaimed. In November 2022, Amazon EMR added support for Project Tardigrade’s fault-tolerant option in the Trino engine with Amazon EMR 6.8 and above. Enabling this feature mitigates Trino task failures caused by worker node failures due to Spot interruptions or On-Demand node stops. Trino now retries failed tasks using intermediate exchange data checkpointed on Amazon S3 or HDFS.

These new enhancements in Trino with Amazon EMR provide improved resiliency for running ETL and batch workloads on Spot Instances with reduced costs. This post showcases the resilience of Amazon EMR with Trino using fault-tolerant configuration to run long-running queries on Spot Instances to save costs. We simulate Spot interruptions on Trino worker nodes by using AWS Fault Injection Simulator (AWS FIS).

Trino architecture overview

Trino runs a query by breaking up the run into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. This pipelined execution model runs multiple stages in parallel and streams data from one stage to another as the data becomes available. This parallel architecture reduces end-to-end latency and makes Trino a fast tool for ad hoc data exploration and ETL jobs over very large datasets. The following diagram illustrates this architecture.

In a Trino cluster, the coordinator is the server responsible for parsing statements, planning queries, and managing workers. The coordinator is also the node to which a client connects and submits statements to run. Every Trino cluster must have at least one coordinator. The coordinator creates a logical model of a query involving a series of stages, which is then translated into a series of connected tasks running on Trino workers. In Amazon EMR, the Trino coordinator runs on the EMR primary node and workers run on core and task nodes.

Faster insights with lower costs with EC2 Spot

You can save significant costs for your ETL and batch workloads running on EMR Trino clusters with a blend of Spot and On-Demand Instances. You can also reduce time-to-insight with faster query runs with lower costs by running more worker nodes on Spot Instances, using the parallel architecture of Trino.

For example, a long-running query on EMR Trino that takes an hour can be finished faster by provisioning more worker nodes on Spot Instances, as shown in the following figure.

Fault-tolerant Trino configuration in Amazon EMR

Fault-tolerant execution in Trino is disabled by default; you can enable it by setting a retry policy in the Amazon EMR configuration. Trino supports two types of retry policies:

  • QUERY – The QUERY retry policy instructs Trino to retry the whole query automatically when an error occurs on a worker node. This policy is only suitable for short-running queries because the whole query is retried from scratch.
  • TASK – The TASK retry policy instructs Trino to retry individual query tasks in the event of failure. This policy is recommended for long-running ETL and batch queries.

With fault-tolerant execution enabled, intermediate exchange data is spooled on an exchange manager so that another worker node can reuse it in the event of a node failure to complete the query run. The exchange manager uses a storage location on Amazon S3 or Hadoop Distributed File System (HDFS) to store and manage spooled data, which is spilled beyond in-memory buffer size of worker nodes. By default, Amazon EMR release 6.9.0 and later uses HDFS as an exchange manager.

Solution overview

In this post, we create an EMR cluster with following architecture.

We provision the following resources using Amazon EMR and AWS FIS:

  • An EMR 6.9.0 cluster with the following configuration:
    • Apache Hadoop, Hue, and Trino applications
    • EMR instance fleets with the following:
      • One primary node (On-Demand) as the Trino coordinator
      • Two core nodes (On-Demand) as the Trino workers and exchange manager
      • Four task nodes (Spot Instances) as Trino workers
    • Trino’s fault-tolerant configuration with following:
      • TPCDS connector
      • The TASK retry policy
      • Exchange manager directory on HDFS
      • Optional recommended settings for query performance optimization
  • An FIS experiment template to target Spot worker nodes in the Trino cluster with interruptions to demonstrate fault-tolerance of EMR Trino with Spot Instances

We use the new Amazon EMR console to create an EMR 6.9.0 cluster. For more information about the new console, refer to Summary of differences.

Create an EMR 6.9.0 cluster

Complete the following steps to create your EMR cluster:

  1. On the Amazon EMR console, create an EMR 6.9.0 cluster named emr-trino-cluster with Hadoop, Hue, and Trino applications using the Custom application bundle.

We need Hue’s web-based interface for submitting SQL queries to the Trino engine and HDFS on core nodes to store intermediate exchange data for Trino’s fault-tolerant runs.

Using multiple Spot capacity pools (each instance type in each Availability Zone is a separate pool) is a best practice to increase your chances of getting large-scale Spot capacity and minimize the impact of a specific instance type being reclaimed in EMR clusters. The Amazon EMR console allows you to configure up to 5 instance types for your core fleet and 15 instance types for your task fleet with the Spot allocation strategy, which allows up to 30 instance types for each fleet from the AWS Command Line Interface (AWS CLI) or Amazon EMR API.

  1. Configure the primary, core, and task fleets with primary and core nodes with On-Demand Instances (m5.xlarge) and task nodes with Spot Instances using multiple instance types.

When you use the Amazon EMR console, the number of vCPUs of the EC2 instance type are used as the count towards the total target capacity of a core or task fleet by default. For example, an m5.xlarge instance type with 4 vCPUs is considered as 4 units of capacity by default.

  1. On the Actions menu under Core or Task fleet, choose Edit weighted capacity.

  1. Because each instance type with 4 vCPUs (xlarge size) is 4 units of capacity, let’s set the cluster size with 8 core units (2 nodes) with On-Demand and 16 task units (4 nodes) with Spot.

Unlike core and task fleets, the primary fleet is always one instance, so no sizing configuration is needed or available for the primary node on the Amazon EMR console.

  1. Select Price-capacity optimized as your Spot allocation strategy, which launches the lowest-priced Spot Instances from your most available pools.

  1. Configure Trino’s fault-tolerant settings in the Software settings section:
[
  {
    "Classification": "trino-connector-tpcds",
    "Properties": {
      "connector.name": "tpcds"
    }
  },
  {
    "Classification": "trino-config",
    "Properties": {
      "exchange.compression-enabled": "true",
      "query.low-memory-killer.delay": "0s",
      "query.remote-task.max-error-duration": "1m",
      "retry-policy": "TASK"
    }
  },
  {
    "Classification": "trino-exchange-manager",
    "Properties": {
      "exchange.base-directories": "/exchange",
      "exchange.use-local-hdfs": "true"
    }
  }
]

Alternatively, you can create a JSON config file with the configuration, store it in an S3 bucket, and select the file path from its S3 location by selecting Load JSON from Amazon S3.

Let’s understand some optional settings for query performance optimization that we have configured:

  • “exchange.compression-enabled”:”true” – This is recommended to enable compression to reduce the amount of data spooled on exchange manager.
  • “query.low-memory-killer.delay”: “0s” – This will reduce the low memory killer delay to allow the Trino engine to unblock nodes running short on memory faster.
  • “query.remote-task.max-error-duration”: “1m” – By default, Trino waits for up to 5 minutes for the task to recover before considering it lost and rescheduling it. This timeout can be reduced for faster retrying of the failed tasks.

For more details of Trino’s fault-tolerant configuration parameters, refer to Fault-tolerant execution.

  1. Let’s also add a tag key called Name with the value MyTrinoCluster to launch EC2 instances with this tag name.

We’ll use this tag to target Spot Instances in the cluster with AWS FIS.

The EMR cluster will take few minutes to be ready in the Waiting state.

Configure an FIS experiment template to target Spot Instances with interruptions in the EMR Trino cluster

We now use the AWS FIS console to simulate interruptions of Spot Instances in the EMR Trino cluster and showcase the fault-tolerance of the Trino engine. Complete the following steps:

  1. On the AWS FIS console, create an experiment template.

  1. Under Actions, choose Add action.
  2. Create an AWS FIS action with Action type as aws:ec2:send-spot-instance-interruptions and Duration Before Interruption as 2 minutes.
  3. Choose Save.

This means FIS will interrupt targeted Spot Instances after 2 minutes of running the experiment.

  1. Under Targets, choose Edit to target all Spot Instances running in the EMR cluster.
  2. For Resource tags, use Name= MyTrinoCluster.
  3. For Resource filters, use as State.Name=running.
  4. For Selection mode, set to ALL.
  5. Choose Save.

  1. Create a new AWS Identity and Access Management (IAM) role automatically to provide permissions to AWS FIS.

  1. Choose Create experiment template.

Launch Hue and Trino web interfaces

When your EMR cluster is in the Waiting state, connect to the Hue web interface for Trino queries and the Trino web interface for monitoring. Alternatively, you can submit your Trino queries using trino-cli after connecting via SSH to your EMR cluster’s primary node. In this post, we will use the Hue web interface for running queries on the EMR Trino engine.

  1. To connect to Hue interface on the primary node from your local computer, navigate to the EMR cluster’s Properties, Network and security, and EC2 security groups (firewall) section.
  2. Edit the primary node security group’s inbound rule to add your IP address and port (port 22).
  3. Retrieve your EMR cluster’s primary node public DNS from your EMR cluster’s Summary tab.

Refer to View web interfaces hosted on Amazon EMR clusters for details on connecting to web interfaces in the primary node from your local computer. You can set up an SSH tunnel with dynamic port forwarding between your local computer and the EMR primary node. Then you can configure proxy settings for your internet browser by using an add-ons such as FoxyProxy for Firefox or SwitchyOmega for Chrome to manage your SOCKS proxy settings.

  1. Connect to Hue by copying the URL (http://<youremrcluster-primary-node-public-dns>:8888/) in your web browser.
  2. Create an account with your choice of user name and password.

After you log in to your account, you can see the query editor on Hue’s web interface.

By default, Amazon EMR configures the Trino web interface on the Trino coordinator (EMR primary node) to use port 8889.

  1. To connect to the Trino web interface, copy the URL (http://<youremrcluster-primary-node-public-dns>:8889/) in your web browser, where you can monitor the Trino cluster and query performance.

In the following screenshot, we can see six active Trino workers (two core and four task nodes of EMR cluster) and no running queries.

  1. Let’s run the Trino query

    select * from system.runtime.nodes from the Hue query editor to see the coordinator and worker nodes’ status and details.

We can see all cluster nodes are in the active state.

Test fault tolerance on Spot interruptions

To test the fault tolerance on Spot interruptions, complete the following steps:

  1. Run the following Trino query using Hue’s query editor:
with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from tpcds.sf100.inventory
,tpcds.sf100.item
,tpcds.sf100.warehouse
,tpcds.sf100.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov ,inv2.d_moy,inv2.mean, inv2.cov

When you go to the Trino web interface, you can see the query running on six active worker nodes (two core On-Demand and four task nodes on Spot Instances).

  1. On the AWS FIS console, choose Experiment templates in the navigation pane.
  2. Select the experiment template EMR_Trino_Interrupter and choose Start experiment.

After a few seconds, the experiment will be in the Completed state and it will trigger stopping all four Spot Instances (four Trino workers) after 2 minutes.

After some time, we can observe in the Trino web UI that we have lost four Trino workers (task nodes running on Spot Instances) but the query is still running with the two remaining On-Demand worker nodes (core nodes). Without the fault-tolerant configuration in EMR Trino, the whole query would fail with even a single worker node failure.

  1. Run the select * from system.runtime.nodes query again in Hue to check the Trino cluster nodes status.

We can see four Spot worker nodes with the status shutting_down.

Trino starts shutting down the four Spot worker nodes as soon as they receive the 2-minute Spot interruption notification sent by the AWS FIS experiment. It will start retrying any failed tasks of these four Spot workers on the remaining active workers (two core nodes) of the cluster. The Trino engine will also not schedule tasks of any new queries on Spot worker nodes in the shutting_down state.

The Trino query will keep running on the remaining two worker nodes and succeed despite the interruption of the four Spot worker nodes. Soon after the Spot nodes stop, Amazon EMR will replenish the stopped capacity (four task nodes) by launching four replacement Spot nodes.

Achieve faster query performance for lower cost with more Trino workers on Spot

Now let’s increase Trino workers capacity from 6 to 10 nodes by manually resizing EMR task nodes on Spot Instances (from 4 to 8 nodes).

We run the same query on a larger cluster with 10 Trino workers. Let’s compare the query completion time (wall time in the Trino Web UI) with the earlier smaller cluster with six workers. We can see 32% faster query performance (1.57 minutes vs. 2.33 minutes).

You can run more Trino workers on Spot Instances to run queries faster to meet your SLAs or process a larger number of queries. With Spot Instances available at discounts up to 90% off On-Demand prices, your cluster costs will not increase significantly vs. running the whole compute capacity on On-Demand Instances.

Clean up

To avoid ongoing charges for resources, navigate to the Amazon EMR console and delete the cluster emr-trino-cluster.

Conclusion

In this post, we showed how you can configure and launch EMR clusters with the Trino engine using its fault-tolerant configuration. With the fault tolerant feature, Trino worker nodes can be run as EMR task nodes on Spot Instances with resilience. You can configure a well-diversified task fleet with multiple instance types using the price-capacity optimized allocation strategy. This will make Amazon EMR request and launch task nodes from the most available, lower-priced Spot capacity pools to minimize costs, interruptions, and capacity challenges. We also demonstrated the resilience of EMR Trino against Spot interruptions using an AWS FIS Spot interruption experiment. EMR Trino continues to run queries by retrying failed tasks on remaining available worker nodes in the event of any Spot node interruption. With fault-tolerant EMR Trino and Spot Instances, you can run big data queries with resilience, while saving costs. For your SLA-driven workloads, you can also add more compute on Spot to adhere to or exceed your SLAs for faster query performance with lower costs compared to On-Demand Instances.


About the Authors

Ashwini Kumar is a Senior Specialist Solutions Architect at AWS based in Delhi, India. Ashwini has more than 18 years of industry experience in systems integration, architecture, and software design, with more recent experience in cloud architecture, DevOps, containers, and big data engineering. He helps customers optimize their cloud spend, minimize compute waste, and improve performance at scale on AWS. He focuses on architectural best practices for various workloads with services including EC2 Spot, AWS Graviton, EC2 Auto Scaling, Amazon EKS, Amazon ECS, and AWS Fargate.

Dipayan Sarkar is a Specialist Solutions Architect for Analytics at AWS, where he helps customers modernize their data platform using AWS Analytics services. He works with customers to design and build analytics solutions, enabling businesses to make data-driven decisions.

Migrate an existing data lake to a transactional data lake using Apache Iceberg

Post Syndicated from Rajdip Chaudhuri original https://aws.amazon.com/blogs/big-data/migrate-an-existing-data-lake-to-a-transactional-data-lake-using-apache-iceberg/

A data lake is a centralized repository that you can use to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data and then run different types of analytics for better business insights. Over the years, data lakes on Amazon Simple Storage Service (Amazon S3) have become the default repository for enterprise data and are a common choice for a large set of users who query data for a variety of analytics and machine leaning use cases. Amazon S3 allows you to access diverse data sets, build business intelligence dashboards, and accelerate the consumption of data by adopting a modern data architecture or data mesh pattern on Amazon Web Services (AWS).

Analytics use cases on data lakes are always evolving. Oftentimes, you want to continuously ingest data from various sources into a data lake and query the data concurrently through multiple analytics tools with transactional capabilities. But traditionally, data lakes built on Amazon S3 are immutable and don’t provide the transactional capabilities needed to support changing use cases. With changing use cases, customers are looking for ways to not only move new or incremental data to data lakes as transactions, but also to convert existing data based on Apache Parquet to a transactional format. Open table formats, such as Apache Iceberg, provide a solution to this issue. Apache Iceberg enables transactions on data lakes and can simplify data storage, management, ingestion, and processing.

In this post, we show you how you can convert existing data in an Amazon S3 data lake in Apache Parquet format to Apache Iceberg format to support transactions on the data using Jupyter Notebook based interactive sessions over AWS Glue 4.0.

Existing Parquet to Iceberg migration

There are two broad methods to migrate the existing data in a data lake in Apache Parquet format to Apache Iceberg format to convert the data lake to a transactional table format.

In-place data upgrade

In an in-place data migration strategy, existing datasets are upgraded to Apache Iceberg format without first reprocessing or restating existing data. This means the data files in the data lake aren’t modified during the migration and all Apache Iceberg metadata files (manifests, manifest files, and table metadata files) are generated outside the purview of the data. In this method, the metadata are recreated in an isolated environment and colocated with the existing data files. This can be a much less expensive operation compared to rewriting all the data files. The existing data file format must be Apache Parquet, Apache ORC, or Apache Avro. An in-place migration can be performed in either of two ways:

  1. Using add_files: This procedure adds existing data files to an existing Iceberg table with a new snapshot that includes the files. Unlike migrate or snapshot, add_files can import files from a specific partition or partitions and doesn’t create a new Iceberg table. This procedure doesn’t analyze the schema of the files to determine if they match the schema of the Iceberg table. Upon completion, the Iceberg table treats these files as if they are part of the set of files owned by Apache Iceberg.
  2. Using migrate: This procedure replaces a table with an Apache Iceberg table loaded with the source’s data files. The table’s schema, partitioning, properties, and location are copied from the source table. Supported formats are Avro, Parquet, and ORC. By default, the original table is retained with the name table_BACKUP_. However, to leave the original table intact during the process, you must use snapshot to create a new temporary table that has the same source data files and schema.

In this post, we show you how you can use the Iceberg add_files procedure for an in-place data upgrade. Note that the migrate procedure isn’t supported in AWS Glue Data Catalog.

The following diagram shows a high-level representation.

CTAS migration of data

The create table as select (CTAS) migration approach is a technique where all the metadata information for Iceberg is generated along with restating all the data files. This method shadows the source dataset in batches. When the shadow is caught up, you can swap the shadowed dataset with the source.

The following diagram showcases the high-level flow.

Prerequisites

To follow along with the walkthrough, you must have the following:

You can check the data size using the following code in the AWS CLI or AWS CloudShell:

//Run this command to check the data size

aws s3 ls --summarize --human-readable --recursive s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023

As of writing this post, there are 107 objects with total size of 70 MB for year 2023 in the Amazon S3 path.

Note that to implement the solution, you must complete a few preparatory steps.

Deploy resources using AWS CloudFormation

Complete the following steps to create the S3 bucket and the AWS IAM role and policy for the solution:

  1. Sign in to your AWS account and then choose Launch Stack to launch the CloudFormation template.

  1. For Stack name, enter a name.
  2. Leave the parameters at the default values. Note that if the default values are changed, then you must make corresponding changes throughout the following steps.
  3. Choose Next to create your stack.

This AWS CloudFormation template deploys the following resources:

  • An S3 bucket named demo-blog-post-XXXXXXXX (XXXXXXXX represents the AWS account ID used).
  • Two folders named parquet and iceberg under the bucket.
  • An IAM role and a policy named demoblogpostrole and demoblogpostscoped respectively.
  • An AWS Glue database named ghcn_db.
  • An AWS Glue Crawler named demopostcrawlerparquet.

After the the AWS CloudFormation template is successfully deployed:

  1. Copy the data in the created S3 bucket using following command in AWS CLI or AWS CloudShell. Replace XXXXXXXX appropriately in the target S3 bucket name.
    Note: In the example, we copy data only for the year 2023. However, you can work with the entire dataset, following the same instructions.

    aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2023/ s3://demo-blog-post-XXXXXXXX/parquet/year=2023

  2. Open the AWS Management Console and go to the AWS Glue console.
  3. On the navigation pane, select Crawlers.
  4. Run the crawler named demopostcrawlerparquet.
  5. After the AWS Glue crawler demopostcrawlerparquet is successfully run, the metadata information of the Apache Parquet data will be cataloged under the ghcn_db AWS Glue database with the table name source_parquet. Notice that the table is partitioned over year and element columns (as in the S3 bucket).

  1. Use the following query to verify the data from the Amazon Athena console. If you’re using Amazon Athena for the first time in your AWS Account, set up a query result location in Amazon S3.
    SELECT * FROM ghcn_db.source_parquet limit 10;

Launch an AWS Glue Studio notebook for processing

For this post, we use an AWS Glue Studio notebook. Follow the steps in Getting started with notebooks in AWS Glue Studio to set up the notebook environment. Launch the notebooks hosted under this link and unzip them on a local workstation.

  1. Open AWS Glue Studio.
  2. Choose ETL Jobs.
  3. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select required ipynb file and choose Open, then choose Create.
  4. On the Notebook setup page, for Job name, enter a logical name.
  5. For IAM role, select demoblogpostrole. Choose Create job. After a minute, the Jupyter notebook editor appears. Clear all the default cells.

The preceding steps launch an AWS Glue Studio notebook environment. Make sure you Save the notebook every time it’s used.

In-place data upgrade

In this section we show you how you can use the add_files procedure to achieve an in-place data upgrade. This section uses the ipynb file named demo-in-place-upgrade-addfiles.ipynb. To use with the add_files procedure, complete the following:

  1. On the Notebook setup page, for Job name, enter demo-in-place-upgrade for the notebook session as explained in Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Execute add_files procedure section performs the metadata creation in the mentioned path.

Review the data file paths for the new Iceberg table. To show an Iceberg table’s current data files, .files can be used to get details such as file_path, partition, and others. Recreated files are pointing to the source path under Amazon S3.

Note the metadata file location after transformation. It’s pointing to the new folder named iceberg under Amazon S3. This can be checked using .snapshots to check Iceberg tables’ snapshot file location. Also, check the same in the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_add_files/metadata/. Also notice that there are two versions of the manifest list created after the add_files procedure has been run. The first is an empty table with the data schema and the second is adding the files.

The table is cataloged in AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are the same.

In summary, you can use the add_files procedure to convert existing data files in Apache Parquet format in a data lake to Apache Iceberg format by adding the metadata files and without recreating the table from scratch. Following are some pros and cons of this method.

Pros

  • Avoids full table scans to read the data as there is no restatement. This can save time.
  • If there are any errors during while writing the metadata, only a metadata re-write is required and not the entire data.
  • Lineage of the existing jobs is maintained because the existing catalog still exists.

Cons

  • If data is processed (inserts, updates, and deletes) in the dataset during the metadata writing process, the process must be run again to include the new data.
  • There must be write downtime to avoid having to run the process a second time.
  • If a data restatement is required, this workflow will not work as source data files aren’t modified.

CTAS migration of data

This section uses the ipynb file named demo-ctas-upgrade.ipynb. Complete the following:

  1. On the Notebook setup page, for Job name, enter demo-ctas-upgrade for the notebook session as explained under Launch Glue notebook for processing.
  2. Run the cells under the section Glue session configurations. Provide the S3 bucket name from the prerequisites for the bucket_name variable by replacing XXXXXXXX.
  3. Run the subsequent cells in the notebook.

Notice that the cell under Create Iceberg table from Parquet section performs the shadow upgrade to Iceberg format. Note that Iceberg requires sorting the data according to table partitions before writing to the Iceberg table. Further details can be found in Writing Distribution Modes.

Notice the data and metadata file paths for the new Iceberg table. It’s pointing to the new path under Amazon S3. Also, check under the Amazon S3 URI s3://demo-blog-post-XXXXXXXX/iceberg/ghcn_db.db/target_iceberg_ctas/ used for this post.

The table is cataloged under AWS Glue under the database ghcn_db with the table type as ICEBERG.

Compare the count of records using Amazon Athena between the source and target table. They are same.

In summary, the CTAS method creates a new table by generating all the metadata files along with restating the actual data. Following are some pros and cons of this method:

Pros

  • It allows you to audit and validate the data during the process because data is restated.
  • If there are any runtime issues during the migration process, rollback and recovery can be easily performed by deleting the Apache Iceberg table.
  • You can test different configurations when migrating a source. You can create a new table for each configuration and evaluate the impact.
  • Shadow data is renamed to a different directory in the source (so it doesn’t collide with old Apache Parquet data).

Cons

  • Storage of the dataset is doubled during the process as both the original Apache Parquet and new Apache Iceberg tables are present during the migration and testing phase. This needs to be considered during cost estimation.
  • The migration can take much longer (depending on the volume of the data) because both data and metadata are written.
  • It’s difficult to keep tables in sync if there changes to the source table during the process.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the datasets, CloudFormation stack, S3 bucket, AWS Glue job, AWS Glue database, and AWS Glue table.

Conclusion

In this post, you learned strategies for migrating existing Apache Parquet formatted data to Apache Iceberg in Amazon S3 to convert to a transactional data lake using interactive sessions in AWS Glue 4.0 to complete the processes. If you have an evolving use case where an existing data lake needs to be converted to a transactional data lake based on Apache Iceberg table format, follow the guidance in this post.

The path you choose for this upgrade, an in-place upgrade or CTAS migration, or a combination of both, will depend on careful analysis of the data architecture and data integration pipeline. Both pathways have pros and cons, as discussed. At a high level, this upgrade process should go through multiple well-defined phases to identify the patterns of data integration and use cases. Choosing the correct strategy will depend on your requirements—such as performance, cost, data freshness, acceptable downtime during migration, and so on.


About the author

Rajdip Chaudhuri is a Senior Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer and movies.

Network connectivity patterns for Amazon OpenSearch Serverless

Post Syndicated from Salman Ahmed original https://aws.amazon.com/blogs/big-data/network-connectivity-patterns-for-amazon-opensearch-serverless/

Amazon OpenSearch Serverless is an on-demand, auto-scaling configuration for Amazon OpenSearch Service. OpenSearch Serverless enables a broad set of use cases, such as real-time application monitoring, log analytics, and website search. OpenSearch Serverless lets you use OpenSearch without having to worry about scaling and managing an OpenSearch cluster. A collection can be accessed over the public internet or from your VPC. As you start accessing OpenSearch Serverless from different VPCs and accounts or from on premises, your connectivity patterns may change. In this post, we cover connectivity patterns and Domain Name System (DNS) resolution for your OpenSearch Serverless collection—whether accessed over the internet, within the VPC, within AWS, or from your on-premises location.

Foundational concepts

The following foundational concepts will help you better understand OpenSearch Serverless and DNS resolution.

Network access policy

The network access policy for OpenSearch Serverless determines whether the collection can be accessed over the internet or only through OpenSearch Serverless managed VPC endpoints. A single policy can be attached to multiple collections.

OpenSearch Serverless VPC endpoint

To access OpenSearch Serverless collections and dashboards privately from a VPC without using an internet gateway, you can create a VPC interface endpoint. When you create a VPC endpoint, it creates an elastic network interface (ENI) in each subnet that you enable for the VPC endpoint. These are requester-managed ENIs that serve as the entry point for traffic destined for the OpenSearch Serverless collection. When you create an OpenSearch Serverless VPC endpoint, the private DNS name option is enabled by default. This means that OpenSearch Serverless also creates an Amazon Route 53 private hosted zone and associates that with the VPC where the endpoint is created. This private hosted zone has a wildcard DNS record *.<region>.aoss.amazonaws.com pointing to the private DNS of the endpoint.

You create an OpenSearch Serverless VPC endpoint via the OpenSearch Serverless console or the OpenSearch Serverless API. You can’t create an OpenSearch Serverless VPC endpoint from the Amazon Virtual Private Cloud (Amazon VPC) console, although once created, you can see them on the VPC console as well.

Amazon Route 53 Resolver

Let’s understand what Amazon Route 53 Resolver does when an Amazon Elastic Compute Cloud (Amazon EC2) instance queries a DNS name. DNS queries originating from the VPC go to the Route 53 Resolver at the VPC+2 IP address. When a DNS query reaches the resolver, it checks if there are any Route 53 forward rules. If it matches, then it forwards the query to the DNS server provided by that rule. If the query remains unresolved, Route 53 Resolver proceeds to check the private hosted zones associated with the originating VPC. If it still remains unresolved, then it checks for VPC DNS, which helps to resolve EC2 DNS names. Lastly, if the query still isn’t resolved, Route 53 Resolver checks the public DNS. The following diagram illustrates this order or operations.

Route 53 DNS Overview

Route 53 Resolver inbound endpoints

Workloads utilizing resources both in a VPC and on premises need to resolve DNS records hosted on-premises and resources hosted in the VPC. With Route 53 Resolver inbound endpoints, you can resolve DNS queries to your VPC from your on-premises network or another VPC.

In the following sections, we provide an overview of connectivity patterns and DNS resolution.

Access an OpenSearch Serverless collection from Amazon EC2 (via internet gateway)

The following figure demonstrates the connectivity pattern to access an OpenSearch Serverless collection over the internet. The collection has an access type set to public, which allows authorized users to connect to the collection over the internet. An EC2 instance within the VPC can establish a connection to the collection via the internet gateway, and users outside the VPC can also access this collection over the internet.

Access an OpenSearch Serverless collection from Amazon EC2 (via internet gateway)

The workflow has the following steps, as indicated in the preceding diagram:

A. The EC2 instance performs a DNS lookup to Route 53 Resolver at a VPC+2 IP address. Route 53 Resolver returns the public IP addresses of the OpenSearch Serverless collection.

B. The EC2 instance sends a data request via an internet gateway to the OpenSearch Serverless collection using this public IP address.

C. An external client resolves to the public IP addresses of the OpenSearch Serverless collection and reaches it via the internet.

Now let’s perform a dig command for the collection or dashboard URL from the EC2 instance, and we observe that it’s resolving to a public IP address.

The following command uses an OpenSearch Serverless collection:

sh-5.2$ dig +short <collection-id>.<region>.aoss.amazonaws.com
192.0.2.10
198.51.100.204
192.0.2.45
198.51.100.55
192.0.2.100
203.0.113.66

The following command uses an OpenSearch dashboard:

sh-5.2$ dig +short dashboards.<region>.aoss.amazonaws.com
192.0.2.10
198.51.100.204
192.0.2.45
198.51.100.55
192.0.2.100
203.0.113.66

Now that you have implemented an OpenSearch Serverless collection with a network access policy as public, you can make the same collection accessible privately within the VPC. To achieve this, complete the following steps:

  1. Modify the network access policy of the collection and change the access type to VPC.
  2. Select the option Create VPC endpoints.

Access type for OpenSearch Serverless

  1. Choose the VPC and at least two subnets where you would like to have a VPC endpoint ENI for high availability.
  2. Choose Confirm to create the VPC endpoint.

Create VPC endpoints for Amazon OpenSearch Serverless

  1. Lastly, select the VPC endpoint and update the policy.

Access Type VPC Endpoint for Amazon OpenSearch Serverless

With the creation of the VPC endpoint, a Route 53 private hosted zone is also created within your account and associated with your VPC. In this setup, a CNAME record *.us-east-1.aoss.amazonaws.com is created to direct to the Regional AWS PrivateLink endpoint, as depicted in the following screenshot.

Route 53 Private Hosted Zone

Due to the private hosted zone associated with the VPC, Route 53 Resolver gives preference to the private hosted zone to resolve any DNS query originating from the VPC. DNS requests for the OpenSearch Serverless collection originating from the EC2 instance get resolved using this associated private hosted zone and resolve to the private IP addresses of the VPC endpoint, which allows Amazon EC2 to connect to the serverless collection via VPC endpoints vs. the internet gateway. We expand on this in the following section.

Access an OpenSearch Serverless collection from Amazon EC2 (via interface VPC endpoints)

The following figure demonstrates the connectivity pattern to access an OpenSearch Serverless collection privately from the VPC. The collection has an access type set to VPC endpoint, restricting access solely from the resources within the VPC via the VPC endpoint and preventing external users from connecting. With the creation of the VPC endpoint, a private hosted zone is also associated with this VPC. An EC2 instance within the VPC can establish a connection with the collection using the VPC endpoint, but resources outside of the VPC don’t have access to this collection because of the network access policy.

Access an OpenSearch Serverless collection from Amazon EC2 (via interface VPC endpoints)

The workflow consists of the following steps:

A. The EC2 instance performs a DNS lookup to Route 53 Resolver at a VPC+2 IP address. Route 53 Resolver returns the private IP addresses of the VPC endpoint because there is a private hosted zone associated with the VPC containing a CNAME record.

B. The EC2 instance sends a data request via the VPC interface endpoint to the OpenSearch Serverless collection.

C. An external client resolves to the public IP addresses of the OpenSearch Serverless collection but is unable to reach it because the network policy restricts to the VPC.

Now let’s perform a dig command for the collection or dashboard URL from the EC2 instance, and we observe that it’s resolving to the private IP addresses belonging to the VPC endpoints.

Use the following code for an OpenSearch Serverless collection:

sh-5.2$ dig +short <collection-id>.<region>.aoss.amazonaws.com
10.0.1.98
10.0.2.83

Use the following code for an OpenSearch dashboard:

sh-5.2$ dig +short dashboards.<region>.aoss.amazonaws.com
10.0.1.98
10.0.2.83

Access an OpenSearch Serverless collection from many VPCs (via interface VPC endpoints) with a VPC endpoint in each VPC

The following figure demonstrates the connectivity pattern to use the same VPC endpoint to connect to multiple OpenSearch Serverless collections. In this scenario, a VPC endpoint is created in each VPC to enable EC2 instances within the VPCs to utilize the VPC endpoint as the connectivity path to OpenSearch Serverless. A private hosted zone is auto generated for each endpoint and associated with the corresponding VPC. Network policies of OpenSearch Serverless collections are updated to allow both VPC Endpoint-1 and VPC Endpoint-2, which allows the EC2 instance in VPC-1 to access both collections via VPC Endpoint-1 and EC2 instances in VPC-2 to access both collections via VPC Endpoint-2.

Access an OpenSearch Serverless collection from many VPCs (via interface VPC endpoints) with a VPC endpoint in each VPC

The workflow consists of the following steps:

A. The EC2 instance performs a DNS lookup to Route 53 Resolver at a VPC+2 IP address. Route 53 Resolver returns the private IP addresses of the VPC endpoint (the EC2 instance in VPC-1 gets the IP address of VPC Endpoint-1 and the EC2 instance in VPC-2 gets the IP address of VPC Endpoint-2), because there is a private hosted zone associated with each of the VPCs containing a CNAME record.

B. The EC2 instance sends a data request via the VPC interface endpoint to the OpenSearch Serverless collection.

Access an OpenSearch Serverless collection from many VPCs (via interface VPC endpoints) with a VPC endpoint in a centralized VPC

In the previous connectivity pattern, we had one endpoint in each VPC through which VPC resources accessed OpenSearch Serverless collections. Many organizations would like to maintain control of these endpoints and keep these in a centralized VPC.

The following figure demonstrates the connectivity pattern to use a centralized VPC endpoint to connect to multiple OpenSearch Serverless collections from multiple VPCs. In this scenario, a VPC interface endpoint is created in a centralized VPC. A private hosted zone is auto generated for this VPC endpoint and associated with the centralized VPC, and then manually associated with VPC-1 and VPC-2. The DNS query for OpenSearch Serverless collections from VPC-1 and VPC-2 gets resolved to the centralized VPC endpoint due to the private hosted zone association. Network policies for both collections allow access from the centralized VPC endpoint only. All three VPCs (centralized, VPC-1, and VPC-2) are connected via AWS Transit Gateway and route tables have routes to direct traffic between VPC-1 and VPC-2 to the centralized VPC and vice versa.

Access an OpenSearch Serverless collection from many VPCs (via interface VPC endpoints) with a VPC endpoint in a centralized VPC

The workflow consists of the following steps:

A. The EC2 instance performs a DNS lookup to Route 53 Resolver at a VPC+2 IP address. Route 53 Resolver returns the private IP addresses of the centralized VPC endpoint, because there is a private hosted zone associated with each VPC containing a CNAME record.

B. The EC2 instance sends a data request to the Transit Gateway ENI in its own VPC. The Transit Gateway route table is checked and the data request is forwarded to the Transit Gateway ENI in the centralized VPC. The Transit Gateway ENI in the centralized VPC sends it to the OpenSearch Serverless collection via the VPC interface endpoint.

Access an OpenSearch Serverless collection from on premises (via AWS Site-to-Site VPN or AWS Direct Connect)

The following figure demonstrates the connectivity pattern for accessing OpenSearch Serverless collections from on premises. You can use either AWS Direct Connect or AWS Site-to-Site VPN to establish connectivity between on-premises and AWS resources. In the following example, Direct Connect is used for the connectivity between AWS and on premises. An OpenSearch Serverless VPC endpoint is created in the VPC, and a private hosted zone is automatically generated and associated with this VPC. The network policy of the OpenSearch Serverless collection is updated to allow connectivity only from the VPC endpoint.

To access these OpenSearch Serverless collections privately from the on-premises environment, resources need to resolve the OpenSearch Serverless collection DNS to the OpenSearch Serverless VPC endpoint IP address. By default, OpenSearch Serverless DNS resolves to the public IP addresses and attempts to access OpenSearch Serverless via the internet. To ensure that OpenSearch Serverless is accessed via the VPC endpoint from on premises, you need to ensure that DNS queries are resolved to a VPC endpoint’s private IP address. Resources inside the VPC use Route 53 Resolver, available at a VPC+2 IP address, to resolve these queries to the VPC endpoint. Route 53 Resolver checks the associated private hosted zone to resolve the query to the VPC endpoint. However, the VPC+2 IP address is not accessible from on premises. To address this, you can utilize the Route 53 Resolver inbound endpoint.

To achieve this, you can create an inbound endpoint in your VPC by following the steps outlined in Configuring inbound forwarding, and then update the on-premises DNS server to forward all the DNS requests for *.<region>.aoss.amazonaws.com to the IP address of the Route 53 Resolver inbound endpoint. When the on-premises client obtains the IP address of the VPC endpoint, it can use Direct Connect or Site-to-Site VPN to establish a private connection to the OpenSearch Serverless collection.

Access an OpenSearch Serverless collection from on premises (via AWS Site-to-Site VPN or AWS Direct Connect)

The workflow contains the following steps:

A. The on-premises client sends a DNS lookup to the on-premises DNS resolver. The on-premises DNS resolver forwards this request to the Route 53 Resolver inbound endpoint. The Route 53 Resolver inbound endpoint sends a DNS lookup to Route 53 Resolver at a VPC+2 IP address. Route 53 Resolver returns the private IP addresses of the VPC endpoint, because there is a private hosted zone associated with this VPC containing a CNAME record.

B. The on-premises client sends a data request to the OpenSearch Serverless collection, which routes via Direct Connect or Site-to-site VPN to the VPC interface endpoint and finally to the OpenSearch Serverless collection.

Conclusion

In this post, we showed you various connectivity patterns for OpenSearch Serverless. We covered the use of hybrid DNS and using a Route 53 Resolver inbound endpoint to allow connectivity from on premises for OpenSearch Serverless. You can choose from various centralization models for reaching multiple OpenSearch Serverless collections within the AWS Cloud or from on-premises locations. Get started today by connecting to OpenSearch Serverless from the various network patterns we discussed.


About the authors

Salman AhmedSalman Ahmed is a Sr. Technical Account Manager in AWS Enterprise Support. He enjoys working with Enterprise Support customers to help them with design, implementation and supporting cloud infrastructure. He also has a passion for networking services and with 12+ years of experience, he leverages that to help customers with adoption of AWS Transit Gateway, AWS Direct Connect and various other AWS networking services.

Ankush GoyalAnkush Goyal is Enterprise Support Lead in AWS Enterprise Support who helps enterprise support customers streamline their cloud operations on AWS. He is a results-driven IT professional with over 18 years of experience.

Rohit AswaniRohit Aswani is a Senior Specialist Solutions Architect focussed on Networking at AWS, where he helps customers build and design scalable, highly-available, secure, resilient and cost effective networks. He holds a MS in Telecommunication Systems Management from Northeastern University, specializing in Computer Networking. In his spare time, Rohit enjoys hiking, traveling and exploring new coffee places.

Use the new SQL commands MERGE and QUALIFY to implement and validate change data capture in Amazon Redshift

Post Syndicated from Yanzhu Ji original https://aws.amazon.com/blogs/big-data/use-the-new-sql-commands-merge-and-qualify-to-implement-and-validate-change-data-capture-in-amazon-redshift/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytics workloads.

Amazon Redshift has added many features to enhance analytical processing like ROLLUP, CUBE and GROUPING SETS, which were demonstrated in the post Simplify Online Analytical Processing (OLAP) queries in Amazon Redshift using new SQL constructs such as ROLLUP, CUBE, and GROUPING SETS. Amazon Redshift has recently added many SQL commands and expressions. In this post, we talk about two new SQL features, the MERGE command and QUALIFY clause, which simplify data ingestion and data filtering.

One familiar task in most downstream applications is change data capture (CDC) and applying it to its target tables. This task requires examining the source data to determine if it is an update or an insert to existing target data. Without the MERGE command, you needed to test the new dataset against the existing dataset using a business key. When that didn’t match, you inserted new rows in the existing dataset; otherwise, you updated existing dataset rows with new dataset values.

The MERGE command conditionally merges rows from a source table into a target table. Traditionally, this could only be achieved by using multiple insert, update, or delete statements separately. When using multiple statements to update or insert data, there is a risk of inconsistencies between the different operations. Merge operation reduces this risk by ensuring that all operations are performed together in a single transaction.

The QUALIFY clause filters the results of a previously computed window function according to user‑specified search conditions. You can use the clause to apply filtering conditions to the result of a window function without using a subquery. This is similar to the HAVING clause, which applies a condition to further filter rows from a WHERE clause. The difference between QUALIFY and HAVING is that filtered results from the QUALIFY clause could be based on the result of running window functions on the data. You can use both the QUALIFY and HAVING clauses in one query.

In this post, we demonstrate how to use the MERGE command to implement CDC and how to use QUALIFY to simplify validation of those changes.

Solution overview

In this use case, we have a data warehouse, in which we have a customer dimension table that needs to always get the latest data from the source system. This data must also reflect the initial creation time and last update time for auditing and tracking purposes.

A simple way to solve this is to override the customer dimension fully every day; however, that won’t achieve the update tracking, which is an audit mandate, and it might not be feasible to do for bigger tables.

You can load sample data from Amazon S3 by following the instruction here. Using the existing customer table under sample_data_dev.tpcds, we create a customer dimension table and a source table that will contain both updates for existing customers and inserts for new customers. We use the MERGE command to merge the source table data with the target table (customer dimension). We also show how to use the QUALIFY clause to simplify validating the changes in the target table.

To follow along with the steps in this post, we recommend downloading the accompanying notebook, which contains all the scripts to run for this post. To learn about authoring and running notebooks, refer to Authoring and running notebooks.

Prerequisites

You should have the following prerequisites:

Create and populate the dimension table

We use the existing customer table under sample_data_dev.tpcds to create a customer_dimension table. Complete the following steps:

  1. Create a table using a few selected fields, including the business key, and add a couple of maintenance fields for insert and update timestamps:
     -- create the customer dimension table DROP TABLE IF EXISTS customer_dim CASCADE;
    CREATE TABLE customer_dim ( 
    customer_dim_id     bigint GENERATED BY DEFAULT AS IDENTITY(1, 1), 
    c_customer_sk integer NOT NULL ENCODE az64 distkey,
    c_first_name character(20) ENCODE lzo,
    c_last_name character(30) ENCODE lzo,
    c_current_addr_sk integer ENCODE az64,
    c_birth_country character varying(20) ENCODE lzo,
    c_email_address character(50) ENCODE lzo,
    record_insert_ts    timestamp WITHOUT time ZONE DEFAULT current_timestamp ,
    record_upd_ts       timestamp WITHOUT time ZONE DEFAULT NULL
    )
    SORTKEY (c_customer_sk);

  2. Populate the dimension table:
    -- populate dimension 
    insert into customer_dim 
           (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
    select  c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address
    from “sample_data_dev”.”tpcds”.”customer”;

  3. Validate the row count and the contents of the table:
    -- check customers count and look at sample data
    select count(1) from customer_dim; 
    select * from customer_dim limit 10;

Simulate customer table changes

Use the following code to simulate changes made to the table:

-- create a source table with some updates and some inserts
-- Update- Email has changed for 100 customers 
drop table if exists src_customer;
create table src_customer distkey(c_customer_sk) as 
select c_customer_sk , c_first_name , c_last_name, c_current_addr_sk, c_birth_country, ‘x’+c_email_address as c_email_address, getdate() as effective_dt
from   customer_dim 
where  c_email_address is not null
limit 100;


-- also let’s add three completely new customers
insert into src_customer values 
(15000001, ‘Customer#15’,’000001’, 10001 ,’USA’    , ‘Customer#[email protected]’, getdate() ),
(15000002, ‘Customer#15’,’000002’, 10002 ,’MEXICO’ , ‘Customer#[email protected]’, getdate() ),
(15000003, ‘Customer#15’,’000003’, 10003 ,’CANADA’ , ‘Customer#[email protected]’, getdate() );

-- check source count
select count(1) from src_customer;

Merge the source table into the target table

Now you have a source table with some changes you need to merge with the customer dimension table.

Before the MERGE command, this type of task needed two separate UPDATE and INSERT commands to implement:

-- merge changes to dim customer
BEGIN TRANSACTION;
-- update current records
UPDATE customer_dim
SET    c_first_name      = src.c_first_name      ,
       c_last_name       = src.c_last_name       , 
       c_current_addr_sk = src.c_current_addr_sk , 
       c_birth_country   = src.c_birth_country   , 
       c_email_address   = src.c_email_address   ,
       record_upd_ts     = current_timestamp
from   src_customer AS src
where  customer_dim.c_customer_sk = src.c_customer_sk ;
-- Insert new records
INSERT INTO customer_dim (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
select src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address 
from   src_customer AS src
where  src.c_customer_sk NOT IN (select c_customer_sk from customer_dim);
-- end merge operation
COMMIT TRANSACTION;

The MERGE command uses a more straightforward syntax, in which we use the key comparison result to decide if we perform an update DML operation (when matched) or an insert DML operation (when not matched):

MERGE INTO customer_dim using src_customer AS src ON customer_dim.c_customer_sk = src.c_customer_sk
WHEN MATCHED THEN UPDATE 
SET c_first_name      = src.c_first_name      ,
    c_last_name       = src.c_last_name       , 
    c_current_addr_sk = src.c_current_addr_sk , 
    c_birth_country   = src.c_birth_country   , 
    c_email_address   = src.c_email_address   ,
    record_upd_ts     = current_timestamp
WHEN NOT MATCHED THEN INSERT (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
                      VALUES (src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address );

Validate the data changes in the target table

Now we need to validate the data has made it correctly to the target table. We can first check the updated data using the update timestamp. Because this was our first update, we can examine all rows where the update timestamp is not null:

-- Check the changes
-- to get updates
select * 
from customer_dim
where record_upd_ts is not null

Use QUALIFY to simplify validation of the data changes

We need to examine the data inserted in this table most recently. One way to do that is to rank the data by its insert timestamp and get those with the first rank. This requires using the window function rank() and also requires a subquery to get the results.

Before the availability of QUALIFY, we needed to build that using a subquery like the following:

select customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts 
from 
( select rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) AS rnk, 
         customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts 
  from customer_dim
  where record_upd_ts is null)
where rnk = 1;

The QUALIFY function eliminates the need for the subquery, as in the following code snippet:

-- to get the newly inserted rows we can make use of Qualify feature
select * 
from customer_dim
where record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) = 1 

Validate all data changes

We can union the results of both queries to get all the inserts and update changes:

-- To get all changes
select *
from (
select 'Updates' as operations, cd.* 
from   customer_dim as cd
where  cd.record_upd_ts is not null
union 
select 'Inserts' as operations, cd.* 
from customer_dim cd
where cd.record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC('second',cd.record_insert_ts) desc) = 1 
) order by 1

Clean up

To clean up the resources used in the post, delete the Redshift provisioned cluster or Redshift Serverless workgroup and namespace you created for this post (this will also drop all the objects created).

If you used an existing Redshift provisioned cluster or Redshift Serverless workgroup and namespace, use the following code to drop these objects:

DROP TABLE IF EXISTS customer_dim CASCADE;
DROP TABLE IF EXISTS src_customer CASCADE;

Conclusion

When using multiple statements to update or insert data, there is a risk of inconsistencies between the different operations. The MERGE operation reduces this risk by ensuring that all operations are performed together in a single transaction. For Amazon Redshift customers who are migrating from other data warehouse systems or who regularly need to ingest fast-changing data into their Redshift warehouse, the MERGE command is a straightforward way to conditionally insert, update, and delete data from target tables based on existing and new source data.

In most analytic queries that use window functions, you may need to use those window functions in your WHERE clause as well. However, this is not permitted, and to do so, you have to build a subquery that contains the required window function and then use the results in the parent query in the WHERE clause. Using the QUALIFY clause eliminates the need for a subquery and therefore simplifies the SQL statement and makes it less difficult to write and read.

We encourage you to start using those new features and give us your feedback. For more details, refer to MERGE and QUALIFY clause.


About the authors

Yanzhu Ji is a Product Manager in the Amazon Redshift team. She has experience in product vision and strategy in industry-leading data products and platforms. She has outstanding skill in building substantial software products using web development, system design, database, and distributed programming techniques. In her personal life, Yanzhu likes painting, photography, and playing tennis.

Ahmed Shehata is a Senior Analytics Specialist Solutions Architect at AWS based on Toronto. He has more than two decades of experience helping customers modernize their data platforms. Ahmed is passionate about helping customers build efficient, performant, and scalable analytic solutions.

Ranjan Burman is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 16 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with cloud solutions.

How to implement cryptographic modules to secure private keys used with IAM Roles Anywhere

Post Syndicated from Edouard Kachelmann original https://aws.amazon.com/blogs/security/how-to-implement-cryptographic-modules-to-secure-private-keys-used-with-iam-roles-anywhere/

AWS Identity and Access Management (IAM) Roles Anywhere enables workloads that run outside of Amazon Web Services (AWS), such as servers, containers, and applications, to use X.509 digital certificates to obtain temporary AWS credentials and access AWS resources, the same way that you use IAM roles for workloads on AWS. Now, IAM Roles Anywhere allows you to use PKCS #11–compatible cryptographic modules to help you securely store private keys associated with your end-entity X.509 certificates.

Cryptographic modules allow you to generate non-exportable asymmetric keys in the module hardware. The cryptographic module exposes high-level functions, such as encrypt, decrypt, and sign, through an interface such as PKCS #11. Using a cryptographic module with IAM Roles Anywhere helps to ensure that the private keys associated with your end-identity X.509 certificates remain in the module and cannot be accessed or copied to the system.

In this post, I will show how you can use PKCS #11–compatible cryptographic modules, such as YubiKey 5 Series and Thales ID smart cards, with your on-premises servers to securely store private keys. I’ll also show how to use those private keys and certificates to obtain temporary credentials for the AWS Command Line Interface (AWS CLI) and AWS SDKs.

Cryptographic modules use cases

IAM Roles Anywhere reduces the need to manage long-term AWS credentials for workloads running outside of AWS, to help improve your security posture. Now IAM Roles Anywhere has added support for compatible PKCS #11 cryptographic modules to the credential helper tool so that organizations that are currently using these (such as defense, government, or large enterprises) can benefit from storing their private keys on their security devices. This mitigates the risk of storing the private keys as files on servers where they can be accessed or copied by unauthorized users.

Note: If your organization does not implement PKCS #11–compatible modules, IAM Roles Anywhere credential helper supports OS certificate stores (Keychain Access for macOS and Cryptography API: Next Generation (CNG) for Windows) to help protect your certificates and private keys.

Solution overview

This authentication flow is shown in Figure 1 and is described in the following sections.

Figure 1: Authentication flow using crypto modules with IAM Roles Anywhere

Figure 1: Authentication flow using crypto modules with IAM Roles Anywhere

How it works

As a prerequisite, you must first create a trust anchor and profile within IAM Roles Anywhere. The trust anchor will establish trust between your public key infrastructure (PKI) and IAM Roles Anywhere, and the profile allows you to specify which roles IAM Roles Anywhere assumes and what your workloads can do with the temporary credentials. You establish trust between IAM Roles Anywhere and your certificate authority (CA) by creating a trust anchor. A trust anchor is a reference to either AWS Private Certificate Authority (AWS Private CA) or an external CA certificate. For this walkthrough, you will use the AWS Private CA.

The one-time initialization process (step “0 – Module initialization” in Figure 1) works as follows:

  1. You first generate the non-exportable private key within the secure container of the cryptographic module.
  2. You then create the X.509 certificate that will bind an identity to a public key:
    1. Create a certificate signing request (CSR).
    2. Submit the CSR to the AWS Private CA.
    3. Obtain the certificate signed by the CA in order to establish trust.
  3. The certificate is then imported into the cryptographic module for mobility purposes, to make it available and simple to locate when the module is connected to the server.

After initialization is done, the module is connected to the server, which can then interact with the AWS CLI and AWS SDK without long-term credentials stored on a disk.

To obtain temporary security credentials from IAM Roles Anywhere:

  1. The server will use the credential helper tool that IAM Roles Anywhere provides. The credential helper works with the credential_process feature of the AWS CLI to provide credentials that can be used by the CLI and the language SDKs. The helper manages the process of creating a signature with the private key.
  2. The credential helper tool calls the IAM Roles Anywhere endpoint to obtain temporary credentials that are issued in a standard JSON format to IAM Roles Anywhere clients via the API method CreateSession action.
  3. The server uses the temporary credentials for programmatic access to AWS services.

Alternatively, you can use the update or serve commands instead of credential-process. The update command will be used as a long-running process that will renew the temporary credentials 5 minutes before the expiration time and replace them in the AWS credentials file. The serve command will be used to vend temporary credentials through an endpoint running on the local host using the same URIs and request headers as IMDSv2 (Instance Metadata Service Version 2).

Supported modules

The credential helper tool for IAM Roles Anywhere supports most devices that are compatible with PKCS #11. The PKCS #11 standard specifies an API for devices that hold cryptographic information and perform cryptographic functions such as signature and encryption.

I will showcase how to use a YubiKey 5 Series device that is a multi-protocol security key that supports Personal Identity Verification (PIV) through PKCS #11. I am using YubiKey 5 Series for the purpose of demonstration, as it is commonly accessible (you can purchase it at the Yubico store or Amazon.com) and is used by some of the world’s largest companies as a means of providing a one-time password (OTP), Fast IDentity Online (FIDO) and PIV for smart card interface for multi-factor authentication. For a production server, we recommend using server-specific PKCS #11–compatible hardware security modules (HSMs) such as the YubiHSM 2, Luna PCIe HSM, or Trusted Platform Modules (TPMs) available on your servers.

Note: The implementation might differ with other modules, because some of these come with their own proprietary tools and drivers.

Implement the solution: Module initialization

You need to have the following prerequisites in order to initialize the module:

Following are the high-level steps for initializing the YubiKey device and generating the certificate that is signed by AWS Private Certificate Authority (AWS Private CA). Note that you could also use your own public key infrastructure (PKI) and register it with IAM Roles Anywhere.

To initialize the module and generate a certificate

  1. Verify that the YubiKey PIV interface is enabled, because some organizations might disable interfaces that are not being used. To do so, run the YubiKey Manager CLI, as follows:
    ykman info

    The output should look like the following, with the PIV interface enabled for USB.

    Figure 2:YubiKey Manager CLI showing that the PIV interface is enabled

    Figure 2:YubiKey Manager CLI showing that the PIV interface is enabled

  2. Use the YubiKey Manager CLI to generate a new RSA2048 private key on the security module in slot 9a and store the associated public key in a file. Different slots are available on YubiKey, and we will use the slot 9a that is for PIV authentication purpose. Use the following command to generate an asymmetric key pair. The private key is generated on the YubiKey, and the generated public key is saved as a file. Enter the YubiKey management key to proceed:
    ykman ‐‐device 123456 piv keys generate 9a pub-yubi.key

  3. Create a certificate request (CSR) based on the public key and specify the subject that will identify your server. Enter the user PIN code when prompted.
    ykman --device 123456 piv certificates request 9a --subject 'CN=server1-demo,O=Example,L=Boston,ST=MA,C=US' pub-yubi.key csr.pem

  4. Submit the certificate request to AWS Private CA to obtain the certificate signed by the CA.
    aws acm-pca issue-certificate \
    --certificate-authority-arn arn:aws:acm-pca:<region>:<accountID>:certificate-authority/<ca-id> \
    --csr fileb://csr.pem \
    --signing-algorithm "SHA256WITHRSA" \
    --validity Value=365,Type="DAYS"

  5. Copy the certificate Amazon Resource Number (ARN), which should look as follows in your clipboard:
    {
    "CertificateArn": "arn:aws:acm-pca:<region>:<accountID>:certificate-authority/<ca-id>/certificate/<certificate-id>"
    }

  6. Export the new certificate from AWS Private CA in a certificate.pem file.
    aws acm-pca get-certificate \
    --certificate-arn arn:aws:acm-pca:<region>:<accountID>:certificate-authority/<ca-id>/certificate/<certificate-id> \
    --certificate-authority-arn arn:aws:acm-pca: <region>:<accountID>:certificate-authority/<ca-id> \
    --query Certificate \
    --output text > certificate.pem

  7. Import the certificate file on the module by using the YubiKey Manager CLI or through the YubiKey Manager UI. Enter the YubiKey management key to proceed.
    ykman --device 123456 piv certificates import 9a certificate.pem

The security module is now initialized and can be plugged into the server.

Configuration to use the security module for programmatic access

The following steps will demonstrate how to configure the server to interact with the AWS CLI and AWS SDKs by using the private key stored on the YubiKey or PKCS #11–compatible device.

To use the YubiKey module with credential helper

  1. Download the credential helper tool for IAM Roles Anywhere for your operating system.
  2. Install the p11-kit package. Most providers (including opensc) will ship with a p11-kit “module” file that makes them discoverable. Users shouldn’t need to specify the PKCS #11 “provider” library when using the credential helper, because we use p11-kit by default.

    If your device library is not supported by p11-kit, you can install that library separately.

  3. Verify the content of the YubiKey by using the following command:
    ykman --device 123456 piv info

    The output should look like the following.

    Figure 3: YubiKey Manager CLI output for the PIV information

    Figure 3: YubiKey Manager CLI output for the PIV information

    This command provides the general status of the PIV application and content in the different slots such as the certificates installed.

  4. Use the credential helper command with the security module. The command will require at least:
    • The ARN of the trust anchor
    • The ARN of the target role to assume
    • The ARN of the profile to pull policies from
    • The certificate and/or key identifiers in the form of a PKCS #11 URI

You can use the certificate flag to search which slot on the security module contains the private key associated with the user certificate.

To specify an object stored in a cryptographic module, you should use the PKCS #11 URI that is defined in RFC7512. The attributes in the identifier string are a set of search criteria used to filter a set of objects. See a recommended method of locating objects in PKCS #11.

In the following example, we search for an object of type certificate, with the object label as “Certificate for Digital Signature”, in slot 1. The pin-value attribute allows you to directly use the pin to log into the cryptographic device.

pkcs11:type=cert;object=Certificate%20for%20Digital%20Signature;id=%01?pin-value=123456

From the folder where you have installed the credential helper tool, use the following command. Because we only have one certificate on the device, we can limit the filter to the certificate type in our PKCS #11 URI.

./aws_signing_helper credential-process
--profile-arn arn:aws:rolesanywhere:<region>:<accountID>:profile/<profileID>
--role-arn arn:aws:iam::<accountID>:role/<assumedRole> 
--trust-anchor-arn arn:aws:rolesanywhere:<region>:<accountID>:trust-anchor/<trustanchorID>
--certificate pkcs11:type=cert?pin-value=<PIN>

If everything is configured correctly, the credential helper tool will return a JSON that contains the credentials, as follows. The PIN code will be requested if you haven’t specified it in the command.

Please enter your user PIN:
  			{
                    "Version":1,
                    "AccessKeyId": <String>,
                    "SecretAccessKey": <String>,
                    "SessionToken": <String>,
                    "Expiration": <Timestamp>
                 }

To use temporary security credentials with AWS SDKs and the AWS CLI, you can configure the credential helper tool as a credential process. For more information, see Source credentials with an external process. The following example shows a config file (usually in ~/.aws/config) that sets the helper tool as the credential process.

[profile server1-demo]
credential_process = ./aws_signing_helper credential-process --profile-arn <arn-for-iam-roles-anywhere-profile> --role-arn <arn-for-iam-role-to-assume> --trust-anchor-arn <arn-for-roles-anywhere-trust-anchor> --certificate pkcs11:type=cert?pin-value=<PIN> 

You can provide the PIN as part of the credential command with the option pin-value=<PIN> so that the user input is not required.

If you prefer not to store your PIN in the config file, you can remove the attribute pin-value. In that case, you will be prompted to enter the PIN for every CLI command.

You can use the serve and update commands of the credential helper mentioned in the solution overview to manage credential rotation for unattended workloads. After the successful use of the PIN, the credential helper will store it in memory for the duration of the process and not ask for it anymore.

Auditability and fine-grained access

You can audit the activity of servers that are assuming roles through IAM Roles Anywhere. IAM Roles Anywhere is integrated with AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service in IAM Roles Anywhere.

To view IAM Roles Anywhere activity in CloudTrail

  1. In the AWS CloudTrail console, in the left navigation menu, choose Event history.
  2. For Lookup attributes, filter by Event source and enter rolesanywhere.amazonaws.com in the textbox. You will find all the API calls that relate to IAM Roles Anywhere, including the CreateSession API call that returns temporary security credentials for workloads that have been authenticated with IAM Roles Anywhere to access AWS resources.
    Figure 4: CloudTrail Events filtered on the “IAM Roles Anywhere” event source

    Figure 4: CloudTrail Events filtered on the “IAM Roles Anywhere” event source

  3. When you review the CreateSession event record details, you can find the assumed role ID in the form of <PrincipalID>:<serverCertificateSerial>, as in the following example:
    Figure 5: Details of the CreateSession event in the CloudTrail console showing which role is being assumed

    Figure 5: Details of the CreateSession event in the CloudTrail console showing which role is being assumed

  4. If you want to identify API calls made by a server, for Lookup attributes, filter by User name, and enter the serverCertificateSerial value from the previous step in the textbox.
    Figure 6: CloudTrail console events filtered by the username associated to our certificate on the security module

    Figure 6: CloudTrail console events filtered by the username associated to our certificate on the security module

    The API calls to AWS services made with the temporary credentials acquired through IAM Roles Anywhere will contain the identity of the server that made the call in the SourceIdentity field. For example, the EC2 DescribeInstances API call provides the following details:

    Figure 7: The event record in the CloudTrail console for the EC2 describe instances call, with details on the assumed role and certificate CN.

    Figure 7: The event record in the CloudTrail console for the EC2 describe instances call, with details on the assumed role and certificate CN.

Additionally, you can include conditions in the identity policy for the IAM role to apply fine-grained access control. This will allow you to apply a fine-grained access control filter to specify which server in the group of servers can perform the action.

To apply access control per server within the same IAM Roles Anywhere profile

  1. In the IAM Roles Anywhere console, select the profile used by the group of servers, then select one of the roles that is being assumed.
  2. Apply the following policy, which will allow only the server with CN=server1-demo to list all buckets by using the condition on aws:SourceIdentity.
    {
      "Version":"2012-10-17",
      "Statement":[
        {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": "s3:ListBuckets",
                "Resource": "*",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceIdentity": "CN=server1-demo"
                    }
                }
            }
      ]
    }

Conclusion

In this blog post, I’ve demonstrated how you can use the YubiKey 5 Series (or any PKCS #11 cryptographic module) to securely store the private keys for the X.509 certificates used with IAM Roles Anywhere. I’ve also highlighted how you can use AWS CloudTrail to audit API actions performed by the roles assumed by the servers.

To learn more about IAM Roles Anywhere, see the IAM Roles Anywhere and Credential Helper tool documentation. For configuration with Thales IDPrime smart card, review the credential helper for IAM Roles Anywhere GitHub page.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Identity and Access Management re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Author

Edouard Kachelmann

Edouard is an Enterprise Senior Solutions Architect at Amazon Web Services. Based in Boston, he is a passionate technology enthusiast who enjoys working with customers and helping them build innovative solutions to deliver measurable business outcomes. Prior to his work at AWS, Edouard worked for the French National Cybersecurity Agency, sharing his security expertise and assisting government departments and operators of vital importance. In his free time, Edouard likes to explore new places to eat, try new French recipes, and play with his kids.

Externalize Amazon MSK Connect configurations with Terraform

Post Syndicated from Ramc Venkatasamy original https://aws.amazon.com/blogs/big-data/externalize-amazon-msk-connect-configurations-with-terraform/

Managing configurations for Amazon MSK Connect, a feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK), can become challenging, especially as the number of topics and configurations grows. In this post, we address this complexity by using Terraform to optimize the configuration of the Kafka topic to Amazon S3 Sink connector. By adopting this strategic approach, you can establish a robust and automated mechanism for handling MSK Connect configurations, eliminating the need for manual intervention or connector restarts. This efficient solution will save time, reduce errors, and provide better control over your Kafka data streaming processes. Let’s explore how Terraform can simplify and enhance the management of MSK Connect configurations for seamless integration with your infrastructure.

Solution overview

At a well-known AWS customer, the management of their constantly growing MSK Connect S3 Sink connector topics has become a significant challenge. The challenges lie in the overhead of managing configurations, as well as dealing with patching and upgrades. Manually handling Kubernetes (K8s) configs and restarting connectors can be cumbersome and error-prone, making it difficult to keep track of changes and updates. At the time of writing this post, MSK Connect does not offer native mechanisms to easily externalize the Kafka topic to S3 Sink configuration.

To address these challenges, we introduce Terraform, an infrastructure as code (IaC) tool. Terraform’s declarative approach and extensive ecosystem make it an ideal choice for managing MSK Connect configurations.

By externalizing Kafka topic to S3 configurations, organizations can achieve the following:

  • Scalability – Effortlessly manage a growing number of topics, ensuring the system can handle increasing data volumes without difficulty
  • Flexibility – Seamlessly integrate MSK Connect configurations with other infrastructure components and services, enabling adaptability to changing business needs
  • Automation – Automate the deployment and management of MSK Connect configurations, reducing manual intervention and streamlining operational tasks
  • Centralized management – Achieve improved governance with centralized management, version control, auditing, and change tracking, ensuring better control and visibility over the configurations

In the following sections, we provide a detailed guide on establishing Terraform for MSK Connect configuration management, defining and decentralizing Topic configurations, and deploying and updating configurations using Terraform.

Prerequisites

Before proceeding with the solution, ensure you have the following resources and access:

  • You need access to an AWS account with sufficient permissions to create and manage resources, including AWS Identity and Access Management (IAM) roles and MSK clusters.
  • To simplify the setup, use the provided AWS CloudFormation template. This template will create the necessary MSK cluster and required resources for this post.
  • For this post, we are using the latest Terraform version (1.5.6).

By ensuring you have these prerequisites in place, you will be ready to follow the instructions and streamline your MSK Connect configurations with Terraform. Let’s get started!

Setup

Setting up Terraform for MSK Connect configuration management includes the following:

  • Installation of Terraform and setting up the environment
  • Setting up the necessary authentication and permissions

Defining and decentralizing topic configurations using Terraform includes the following:

  • Understanding the structure of Terraform configuration files
  • Determining the required variables and resources
  • Utilizing Terraform’s modules and interpolation for flexibility

The decision to externalize the configuration was primarily driven by the customer’s business requirement. They anticipated the need to add topics periodically and wanted to avoid the need to bring down and write specific code each time. Given the limitations of MSK Connect (as of this writing), it’s important to note that MSK Connect can handle up to 300 workers. For this proof of concept (POC), we opted for a configuration with 100 topics directed to a single Amazon Simple Storage Service (Amazon S3) bucket. To ensure compatibility within the 300-worker limit, we set the MCU count to 1 and configured auto scaling with a maximum of 2 workers. This ensures that the configuration remains within the bounds of the 300-worker maximum.

To make the configuration more flexible, we specify the variables that can be utilized in the code.(variables.tf):

variable "aws_region" {
description = "The AWS region to deploy resources in."
type = string
}

variable "s3_bucket_name" {
description = "s3_bucket_name."
type = string
}

variable "topics" {
description = "topics"
type = string
}

variable "msk_connect_name" {
description = "Name of the MSK Connect instance."
type = string
}

variable "msk_connect_description" {
description = "Description of the MSK Connect instance."
type = string
}

# Rest of the variables...

To set up the AWS MSK Connector for the S3 Sink, we need to provide various configurations. Let’s examine the connector_configuration block in the code snippet provided in the main.tf file in more detail:

connector_configuration = {
"connector.class" = "io.confluent.connect.s3.S3SinkConnector"
"s3.region" = "us-east-1"
"flush.size" = "5"
"schema.compatibility" = "NONE"
"tasks.max" = "1"
"topics" = var.topics
"format.class" = "io.confluent.connect.s3.format.json.JsonFormat"
"partitioner.class" = "io.confluent.connect.storage.partitioner.DefaultPartitioner"
"value.converter.schemas.enable" = "false"
"value.converter" = "org.apache.kafka.connect.json.JsonConverter"
"storage.class" = "io.confluent.connect.s3.storage.S3Storage"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"s3.bucket.name" = var.s3_bucket_name
"topics.dir" = "cxdl-data/KairosTelemetry"
}

The kafka_cluster block in the code snippet defines the Kafka cluster details, including the bootstrap servers and VPC settings. You can reference the variables to specify the appropriate values:

kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.bootstrap_servers

vpc {
security_groups = [var.security_groups]
subnets = [var.aws_subnet_example1_id, var.aws_subnet_example2_id, var.aws_subnet_example3_id]
}
}
}

To secure the connection between Kafka and the connector, the code snippet includes configurations for authentication and encryption:

  • The kafka_cluster_client_authentication block sets the authentication type to IAM, enabling the use of IAM for authentication
  • The kafka_cluster_encryption_in_transit block enables TLS encryption for data transfer between Kafka and the connector
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }

  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }

You can externalize the variables and provide dynamic values using a var.tfvars file. Let’s assume the content of the var.tfvars file is as follows:

aws_region = "us-east-1"
msk_connect_name = "confluentinc-MSK-connect-s3-2"
msk_connect_description = "My MSK Connect instance"
s3_bucket_name = "msk-lab-xxxxxxxxxxxx-target-bucket"
topics = "salesdb.salesdb.CUSTOMER,salesdb.salesdb.CUSTOMER_SITE,salesdb.salesdb.PRODUCT,salesdb.salesdb.PRODUCT_CATEGORY,salesdb.salesdb.SALES_ORDER,salesdb.salesdb.SALES_ORDER_ALL,salesdb.salesdb.SALES_ORDER_DETAIL,salesdb.salesdb.SALES_ORDER_DETAIL_DS,salesdb.salesdb.SUPPLIER"
bootstrap_servers = "b-2.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-3.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098,b-1.mskclustermskconnectl.4xwlfx.c11.kafka.us-east-1.amazonaws.com:9098“
aws_subnet_example1_id = "subnet-016ef7bb5f5db5759"
aws_subnet_example2_id = "subnet-0114c390d379134fa"
aws_subnet_example3_id = "subnet-0f6352ad89a1454f2"
security_groups = "sg-07eb8f8e4559334e7"
aws_mskconnect_custom_plugin_example_arn = "arn:aws:kafkaconnect:us-east-1:xxxxxxxxxxxx:custom-plugin/confluentinc-kafka-connect-s3-10-0-3/e9aeb52e-d172-4dba-9de5-f5cf73f1cb9e-2"
aws_mskconnect_custom_plugin_example_latest_revision = "1"
aws_iam_role_example_arn = "arn:aws:iam::xxxxxxxxxxxx:role/msk-connect-lab-S3ConnectorIAMRole-3LBTU7YAV9CM"

Deploy and update configurations using Terraform

Once you’ve defined your MSK Connect infrastructure using Terraform, applying these configurations is a straightforward process for creating or updating your infrastructure. This becomes particularly convenient when a new topic needs to be added. Thanks to the externalized configuration, incorporating this change is now a seamless task. The steps are as follows:

  1. Download and install Terraform from the official website (https://www.terraform.io/downloads.html) for your operating system.
  2. Confirm the installation by running the terraform version command on your command line interface.
  3. Ensure that you have configured your AWS credentials using the AWS Command Line Interface (AWS CLI) or by setting environment variables. You can use the aws configure command to configure your credentials if you’re using the AWS CLI.
  4. Place the main.tf, variables.tf, and var.tfvars files in the same Terraform directory.
  5. Open a command line interface, navigate to the directory containing the Terraform files, and run the command terraform init to initialize Terraform and download the required providers.
  6. Run the command terraform plan -var-file="var.tfvars" to review the run plan.

This command shows the changes that Terraform will make to the infrastructure based on the provided variables. This step is optional but is often used as a preview of the changes Terraform will make.

  1. If the plan looks correct, run the command terraform apply -var-file="var.tfvars" to apply the configuration.

Terraform will create the MSK_Connect in your AWS account. This will prompt you for confirmation before proceeding.

  1. After the terraform apply command is complete, verify the infrastructure has been created or updated on the console.
  2. For any changes or updates, modify your Terraform files (main.tf, variables.tf, var.tfvars) as needed, and then rerun the terraform plan and terraform apply commands.
  3. When you no longer need the infrastructure, you can use terraform destroy -var-file="var.tfvars" to remove all resources created by your Terraform files.

Be careful with this command because it will delete all the resources defined in your Terraform files.

Conclusion

In this post, we addressed the challenges faced by a customer in managing MSK Connect configurations and described a Terraform-based solution. By externalizing Kafka topic to Amazon S3 configurations, you can streamline your configuration management processes, achieve scalability, enhance flexibility, automate deployments, and centralize management. We encourage you to use Terraform to optimize your MSK Connect configurations and explore further possibilities in managing your streaming data pipelines efficiently.

To get started with externalizing MSK Connect configurations using Terraform, refer to the provided implementation steps and the Getting Started with Terraform guide, MSK Connect documentation, Terraform documentation, and example GitHub repository.

Using Terraform to externalize the Kafka topic to Amazon S3 Sink configuration in MSK Connect offers a powerful solution for managing and scaling your streaming data pipelines. By automating the deployment, updating, and central management of configurations, you can ensure efficiency, flexibility, and scalability in your data processing workflows.


About the Author

RamC Venkatasamy is a Solutions Architect based in Bloomington, Illinois. He helps AWS Strategic customers transform their businesses in the cloud. With a fervent enthusiasm for Serverless, Event-Driven Architecture and GenAI.

How Chime Financial uses AWS to build a serverless stream analytics platform and defeat fraudsters

Post Syndicated from Khandu Shinde original https://aws.amazon.com/blogs/big-data/how-chime-financial-uses-aws-to-build-a-serverless-stream-analytics-platform-and-defeat-fraudsters/

This is a guest post by Khandu Shinde, Staff Software Engineer and Edward Paget, Senior Software Engineering at Chime Financial.

Chime is a financial technology company founded on the premise that basic banking services should be helpful, easy, and free. Chime partners with national banks to design member first financial products. This creates a more competitive market with better, lower-cost options for everyday Americans who aren’t being served well by traditional banks. We help drive innovation, inclusion, and access across the industry.

Chime has a responsibility to protect our members against unauthorized transactions on their accounts. Chime’s Risk Analysis team constantly monitors trends in our data to find patterns that indicate fraudulent transactions.

This post discusses how Chime utilizes AWS Glue, Amazon Kinesis, Amazon DynamoDB, and Amazon SageMaker to build an online, serverless fraud detection solution — the Chime Streaming 2.0 system.

Problem statement

In order to keep up with the rapid movement of fraudsters, our decision platform must continuously monitor user events and respond in real-time. However, our legacy data warehouse-based solution was not equipped for this challenge. It was designed to manage complex queries and business intelligence (BI) use cases on a large scale. However, with a minimum data freshness of 10 minutes, this architecture inherently didn’t align with the near real-time fraud detection use case.

To make high-quality decisions, we need to collect user event data from various sources and update risk profiles in real time. We also need to be able to add new fields and metrics to the risk profiles as our team identifies new attacks, without needing engineering intervention or complex deployments.

We decided to explore streaming analytics solutions where we can capture, transform, and store event streams at scale, and serve rule-based fraud detection models and machine learning (ML) models with milliseconds latency.

Solution overview

The following diagram illustrates the design of the Chime Streaming 2.0 system.

The design included the following key components:

  1. We have Amazon Kinesis Data Streams as our streaming data service to capture and store event streams at scale. Our stream pipelines capture various event types, including user enrollment events, user login events, card swipe events, peer-to-peer payments, and application screen actions.
  2. Amazon DynamoDB is another data source for our Streaming 2.0 system. It acts as the application backend and stores data such as blocked devices list and device-user mapping. We mainly use it as lookup tables in our pipeline.
  3. AWS Glue jobs form the backbone of our Streaming 2.0 system. The simple AWS Glue icon in the diagram represents thousands of AWS Glue jobs performing different transformations. To achieve the 5-15 seconds end-to-end data freshness service level agreement (SLA) for the Steaming 2.0 pipeline, we use streaming ETL jobs in AWS Glue to consume data from Kinesis Data Streams and apply near-real-time transformation. We choose AWS Glue mainly due to its serverless nature, which simplifies infrastructure management with automatic provisioning and worker management, and the ability to perform complex data transformations at scale.
  4. The AWS Glue streaming jobs generate derived fields and risk profiles that get stored in Amazon DynamoDB. We use Amazon DynamoDB as our online feature store due to its millisecond performance and scalability.
  5. Our applications call Amazon SageMaker Inference endpoints for fraud detections. The Amazon DynamoDB online feature store supports real-time inference with single digit millisecond query latency.
  6. We use Amazon Simple Storage Service (Amazon S3) as our offline feature store. It contains historical user activities and other derived ML features.
  7. Our data scientist team can access the dataset and perform ML model training and batch inferencing using Amazon SageMaker.

AWS Glue pipeline implementation deep dive

There are several key design principles for our AWS Glue Pipeline and the Streaming 2.0 project.

  • We want to democratize our data platform and make the data pipeline accessible to all Chime developers.
  • We want to implement cloud financial backend services and achieve cost efficiency.

To achieve data democratization, we needed to enable different personas in the organization to use the platform and define transformation jobs quickly, without worrying about the actual implementation details of the pipelines. The data infrastructure team built an abstraction layer on top of Spark and integrated services. This layer contained API wrappers over integrated services, job tags, scheduling configurations and debug tooling, hiding Spark and other lower-level complexities from end users. As a result, end users were able to define jobs with declarative YAML configurations and define transformation logic with SQL. This simplified the onboarding process and accelerated the implementation phase.

To achieve cost efficiency, our team built a cost attribution dashboard based on AWS cost allocation tags. We enforced tagging with the above abstraction layer and had clear cost attribution for all AWS Glue jobs down to the team level. This enabled us to track down less optimized jobs and work with job owners to implement best practices with impact-based priority. One common misconfiguration we found was sizing of AWS Glue jobs. With data democratization, many users lacked the knowledge to right-size their AWS Glue jobs. The AWS team introduced AWS Glue auto scaling to us as a solution. With AWS Glue Auto Scaling, we no longer needed to plan AWS Glue Spark cluster capacity in advance. We could just set the maximum number of workers and run the jobs. AWS Glue monitors the Spark application execution, and allocates more worker nodes to the cluster in near-real time after Spark requests more executors based on our workload requirements. We noticed a 30–45% cost saving across our AWS Glue Jobs once we turned on Auto Scaling.

Conclusion

In this post, we showed you how Chime’s Streaming 2.0 system allows us to ingest events and make them available to the decision platform just seconds after they are emitted from other services. This enables us to write better risk policies, provide fresher data for our machine learning models, and protect our members from unauthorized transactions on their accounts.

Over 500 developers in Chime are using this streaming pipeline and we ingest more than 1 million events per second. We follow the sizing and scaling process from the AWS Glue streaming ETL jobs best practices blog and land on a 1:1 mapping between Kinesis Shard and vCPU core. The end-to-end latency is less than 15 seconds, and it improves the model score calculation speed by 1200% compared to legacy implementation. This system has proven to be reliable, performant, and cost-effective at scale.

We hope this post will inspire your organization to build a real-time analytics platform using serverless technologies to accelerate your business goals.


About the Authors

Khandu Shinde Khandu Shinde is a Staff Engineer focused on Big Data Platforms and Solutions for Chime. He helps to make the platform scalable for Chime’s business needs with architectural direction and vision. He’s based in San Francisco where he plays cricket and watches movies.

Edward Paget Edward Paget is a Software Engineer working on building Chime’s capabilities to mitigate risk to ensure our members’ financial peace of mind. He enjoys being at the intersection of big data and programming language theory. He’s based in Chicago where he spends his time running along the lake shore.

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 2

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints-part-2/

This post is a continuation of a two-part series. In the first part, we delved into Apache Flink‘s internal mechanisms for checkpointing, in-flight data buffering, and handling backpressure. We covered these concepts in order to understand how buffer debloating and unaligned checkpoints allow us to enhance performance for specific conditions in Apache Flink applications.

In Part 1, we introduced and examined how to use buffer debloating to improve in-flight data processing. In this post, we focus on unaligned checkpoints. This feature has been available since Apache Flink 1.11 and has received many improvements since then. Unaligned checkpoints help, under specific conditions, to reduce checkpointing time for applications suffering temporary backpressure, and can be now enabled in Amazon Managed Service for Apache Flink applications running Apache Flink 1.15.2 through a support ticket.

Even though this feature might improve performance for your checkpoints, if your application is constantly failing because of checkpoints timing out, or is suffering from having constant backpressure, you may require a deeper analysis and redesign of your application.

Aligned checkpoints

As discussed in Part 1, Apache Flink checkpointing allows applications to record state in case of failure. We’ve already discussed how checkpoints, when triggered by the job manager, signal all source operators to snapshot their state, which is then broadcasted as a special record called a checkpoint barrier. This process achieves exactly-once consistency for state in a distributed streaming application through the alignment of these barriers.

Let’s walk through the process of aligned checkpoints in a standard Apache Flink application. Remember that Apache Flink distributes the workload horizontally: each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks based on its parallelism.

Barrier alignment

The alignment of checkpoint barriers is crucial for achieving exactly-once consistency in Apache Flink applications during checkpoint runs. To recap, when a job manager triggers a checkpoint, all sub-tasks of source operators receive a signal to initiate the checkpoint process. Each sub-task independently snapshots its state to the state backend and broadcasts a special record known as a checkpoint barrier to all outgoing streams.

When an application operates with a parallelism higher than 1, multiple instances of each task—referred to as sub-tasks—enable parallel message consumption and processing. A sub-task can receive distinct partitions of the same stream from different upstream sub-tasks, such as after a stream repartitioning with keyBy or rebalance operations. To maintain exactly-once consistency, all sub-tasks must wait for the arrival of all checkpoint barriers before taking a snapshot of the state. The following diagram illustrates the checkpoint barriers flow.

Checkpoint Barriers flow in the Buffer Queues

This phase is called checkpoint alignment. During alignment, the sub-task stops processing records from the partitions from which it has already received barriers, as shown in the following figure.

The first Barrier reaches the sub-task: Checkpointing Alignment begins

However, it continues to process partitions that are behind the barrier.

Processing continues only for partitions behind the barrier

When barriers from all upstream partitions have arrived, the sub-task takes a snapshot of its state.

Barrier alignment complete: snapshot state

Then it broadcasts the barrier downstream.

Emit Barriers downstream, and continue processing

The time a sub-task spends waiting for all barriers to arrive is measured by the checkpoint Alignment Duration metric, which can be observed in the Apache Flink UI.

If the application experiences backpressure, an increase in this metric could lead to longer checkpoint durations and even checkpoint failures due to timeouts. This is where unaligned checkpoints become a viable option to potentially enhance checkpointing performance.

Unaligned checkpoints

Unaligned checkpoints address situations where backpressure is not just a temporary spike, but results in timeouts for aligned checkpoints, due to barrier queuing within the stream. As discussed in Part 1, checkpoint barriers can’t overtake regular records. Therefore, significant backpressure can slow down the movement of barriers across the application, potentially causing checkpoint timeouts.

The objective of unaligned checkpoints is to enable barrier overtaking, allowing barriers to move swiftly from source to sink even when the data flow is slower than anticipated.

Building on what we saw in Part 1 concerning checkpoints and what aligned checkpoints are, let’s explore how unaligned checkpoints modify the checkpointing mechanism.

Upon emission, each source’s checkpoint barrier is injected into the stream flowing across sub-tasks. It travels from the source output network buffer queue into the input network buffer queue of the subsequent operator.

Upon the arrival of the first barrier in the input network buffer queue, the operator initially waits for barrier alignment. If the specified alignment timeout expires because not all barriers have reached the end of the input network buffer queue, the operator switches to unaligned checkpoint mode.

The alignment timeout can be set programmatically by env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30)), but modifying the default is not recommended in Apache Flink 1.15.

Checkpoint barriers flow in the buffer queues

The operator waits until all checkpoint barriers are present in the input network buffer queue before triggering the checkpoint. Unlike aligned checkpoints, the operator doesn’t need to wait for all barriers to reach the queue’s end, allowing the operator to have in-flight data from the buffer that hasn’t been processed before checkpoint initiation.

All barriers are in the input queues

After all barriers have arrived in the input network buffer queue, the operator advances the barrier to the end of the output network buffer queue. This enhances checkpointing speed because the barrier can smoothly traverse the application from source to sink, independent of the application’s end-to-end latency.

Barriers can overtake in-flight messages

After forwarding the barrier to the output network buffer queue, the operator initiates the snapshot of in-flight data between the barriers in the input and output network buffer queues, along with the snapshot of the state.

Although processing is momentarily paused during this process, the actual writing to the remote persistent state storage occurs asynchronously, preventing potential bottlenecks.

Snapshot state and in-flight messages

The local snapshot, encompassing in-flight messages and state, is saved asynchronously in the remote persistent state store, while the barrier continues its journey through the application.

Processing continues

When to use unaligned checkpoints

Remember, barrier alignment only occurs between partitions coming from different sub-tasks of the same operator. Therefore, if an operator is experiencing temporary backpressure, enabling unaligned checkpoints may be beneficial. This way, the application doesn’t have to wait for all barriers to reach the operator before performing the snapshot of state or moving the barrier forward.

Temporary backpressure could arise from the following:

  • A surge in data ingestion
  • Backfilling or catching up with historical data
  • Increased message processing time due to delayed external systems

Another scenario where unaligned checkpoints prove advantageous is when working with exactly-once sinks. Utilizing the two-phase commit sink function for exactly-once sinks, unaligned checkpoints can expedite checkpoint runs, thereby reducing end-to-end latency.

When not to use unaligned checkpoints

Unaligned checkpoints won’t reduce the time required for savepoints (called snapshots in the Amazon Managed Service for Apache Flink implementation) because savepoints exclusively utilize aligned checkpoints. Furthermore, because Apache Flink doesn’t permit concurrent unaligned checkpoints, savepoints won’t occur simultaneously with unaligned checkpoints, potentially elongating savepoint durations.

Unaligned checkpoints won’t fix any underlying issue in your application design. If your application is suffering from persistent backpressure or constant checkpointing timeouts, this might indicate data skewness or underprovisioning, which may require improving and tuning the application.

Using unaligned checkpoints with buffer debloating

One alternative for reducing the risks associated with an increased state size is to combine unaligned checkpoints with buffer debloating. This approach results in having less in-flight data to snapshot and store in the state, along with less data to be used for recovery in case of failure. This synergy facilitates enhanced performance and efficient checkpoint runs, leading to smaller checkpointing sizes and faster recovery times. When testing the use of unaligned checkpoints, we recommend doing so with buffer debloating to prevent the state size from increasing.

Limitations

Unaligned checkpoints are subject to the following limitations:

  • They provide no benefit for operators with a parallelism of 1.
  • They only improve performance for operators where barrier alignment would have occurred. This alignment happens only if records are coming from different sub-tasks of the same operator, for example, through repartitioning or keyBy operations.
  • Operators receiving input from multiple sources or participating in joins might not experience improvements, because the operator would be receiving data from different operators in those cases.
  • Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.
  • As we have seen, savepoints always use aligned checkpoints. If the savepoints of your applications are slow due to barrier alignment, unaligned checkpoints will not help.
  • Additional limitations affect watermarks, message ordering, and broadcast state in recovery. For more details, refer to Limitations.

Considerations

Considerations for implementing unaligned checkpoints:

  • Unaligned checkpoints introduce additional I/O to checkpoint storage
  • Checkpoints encompass not only operator state but also in-flight data within network buffer queues, leading to increased state size

Recommendations

We offer the following recommendations:

  • Consider enabling unaligned checkpoints only if both of the following conditions are true:
  • Checkpoints are timing out.
  • The average checkpoint Async Duration of any operator is more than 50% of the total checkpoint duration for the operator (sum of Sync Duration + Async Duration).
  • Consider enabling buffer debloating first, and evaluate whether it solves the problem of checkpoints timing out.
  • If buffer debloating doesn’t help, consider enabling unaligned checkpoints along with buffer debloating. Buffer debloating mitigates the drawbacks of unaligned checkpoints, reducing the amount of in-flight data.
  • If unaligned checkpoints and buffer debloating together don’t improve checkpoint alignment duration, consider testing unaligned checkpoints alone.

Decision flow

Finally, but most importantly, always test unaligned checkpoints in a non-production environment first, running some comparative performance testing with a realistic workload, and verify that unaligned checkpoints actually reduce checkpoint duration.

Conclusion

This two-part series explored advanced strategies for optimizing checkpointing within your Amazon Managed Service for Apache Flink applications. By harnessing the potential of buffer debloating and unaligned checkpoints, you can unlock significant performance improvements and streamline checkpoint processes. However, it’s important to understand when these techniques will provide improvements and when they will not. If you believe your application may benefit from checkpoint performance improvement, you can enable these features in your Amazon Managed Service For Apache Flink version 1.15 applications. We recommend first enabling buffer debloating and testing the application. If you are still not seeing the expected outcome, enable buffer debloating with unaligned checkpoints. This way, you can immediately reduce the state size and the additional I/O to state backends. Lastly, you may try using unaligned checkpoints by itself, bearing in mind the considerations we’ve mentioned.

With a deeper understanding of these techniques and their applicability, you are better equipped to maximize the efficiency of checkpoints and mitigate the effect of backpressure in your Apache Flink application.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 1

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-1-optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints/

This post is the first of a two-part series regarding checkpointing mechanisms and in-flight data buffering. In this first part, we explain some of the fundamental Apache Flink internals and cover the buffer debloating feature. In the second part, we focus on unaligned checkpoints.

Apache Flink is an open-source distributed engine for stateful processing over unbounded datasets (streams) and bounded datasets (batches). Amazon Managed Service for Apache Flink, formerly known as Amazon Kinesis Data Analytics, is the AWS service offering fully managed Apache Flink.

Apache Flink is designed for stateful processing at scale, for high throughput and low latency. It scales horizontally, distributing processing and state across multiple nodes, and is designed to withstand failures without compromising the exactly-once consistency it provides.

Internally, Apache Flink uses clever mechanisms to maintain exactly-once state consistency, while also optimizing for throughput and reduced latency. The default behavior works well for most use cases. Recent versions introduced two functionalities that can be optionally enabled to improve application performance under particular conditions: buffer debloating and unaligned checkpoints.

Buffer debloating and unaligned checkpoints can be enabled on Amazon Managed Service for Apache Flink version 1.15.

To understand how these functionalities can help and when to use them, we need to dive deep into some of the fundamental internal mechanisms of Apache Flink: checkpointing, in-flight data buffering, and backpressure.

Maintaining state consistency through failures with checkpointing

Apache Flink checkpointing periodically saves the internal application state for recovering in case of failure. Each of the distributed components of an application asynchronously snapshots its state to an external persistent datastore. The challenge is taking snapshots guaranteeing exactly-once consistency. A naïve “stop-the-world, take a snapshot” implementation would never meet the high throughput and low latency goals Apache Flink has been designed for.

Let’s walk through the process of checkpointing in a simple streaming application.

As shown in the following figure, Apache Flink distributes the work horizontally. Each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks, based on its parallelism. The application is coordinated by a job manager. Checkpoints are periodically initiated by the job manager, sending a signal to all source operators’ sub-tasks.

Checkpoint initiated by the Job Manager

On receiving the signal, each source sub-task independently snapshots its state (for example, the offsets of the Kafka topic it is consuming) to a persistent storage, and then broadcasts a special record called checkpoint barrier (“CB” in the following diagrams) to all outgoing streams. Checkpoint barriers work similarly to watermarks in Apache Flink, flowing in-bands, along with normal records. A barrier does not overtake normal records and is not overtaken.

Source operators emit checkpoint bariers

When a downstream operator’s sub-task receives all checkpoint barriers from all input channels, it starts snapshotting its state.

A sub-task does not pause processing while saving its state to the remote, persistent state backend. This is a two-phase operation. First, the sub-task takes a snapshot of the state, on the local file system or in memory, depending on application configuration. This operation is blocking but very fast. When the snapshot is complete, it restarts processing records, while the state is asynchronously saved to the external, persistent state store. When the state is successfully saved to the state store, the sub-task acknowledges to the job manager that its checkpointing is complete.

The time a sub-task spends on the synchronous and asynchronous parts of the checkpoint is measured by Sync Duration and Async Duration metrics, shown by the Apache Flink UI. It is then asynchronously sent to the backend. After the fast snapshot, the sub-task restarts processing messages. The backend notifies the sub-task when the state has been successfully saved. The sub-task, in turn, sends an acknowledgment to the job manager that checkpointing is complete.

Sub-tasks acknowledge checkpoint completion

Checkpoint barriers propagate through all operators, down to the sinks. When all sink sub-tasks have acknowledged the checkpoint to the job manager, the checkpoint is declared complete and can be used to recover the application, for example in case of failure.

Sink operators acknowledge checkpoint is complete

Checkpoint barrier alignment

A sub-task may receive different partitions of the same stream from different upstream sub-tasks, for example when a stream is repartitioned with a keyBy or a rebalance. Each upstream sub-task will emit a checkpoint barrier independently. To maintain exactly-once consistency, the sub-task must wait for the barriers to arrive on all input partitions before taking a snapshot of its state.

This phase is called checkpoint alignment. During the alignment, the sub-task stops processing records from the partitions it already received the barrier from and continues processing the partitions that are behind the barrier.

After the barriers from all upstream partitions have arrived, the sub-task takes the snapshot of its state and then broadcasts the barrier downstream.

The time spent by a sub-task while aligning barriers is measured by the Checkpoint Alignment Duration metric, shown by the Apache Flink UI.

Checkpoint barrier alignment

In-flight data buffering

To optimize for throughput, Apache Flink tries to keep each sub-task always busy. This is achieved by transmitting records over the network in blocks and by buffering in-flight data. Note that this is data transmission optimization; Flink operators always process records one at the time.

Data is handed over between sub-tasks in units called network buffers. A network buffer has a fixed size, in bytes.

Sub-tasks also buffer in-flight input and output data. These buffers are called network buffer queues. Each queue is composed of multiple network buffers. Each sub-task has an input network buffer queue for each upstream sub-task and an output network buffer queue for each downstream sub-task.

Each record emitted by the sub-task is serialized, put into network buffers, and published to the output network buffer queue. To use all the available space, multiple messages can be packed into a single network buffer or split across subsequent network buffers.

A separate thread sends full network buffers over the network, where they are stored in the destination sub-task’s input network buffer queue.

When the destination sub-task thread is free, it deserializes the network buffers, rebuilds the records, and processes them one at a time.

Network Buffer Queue

Backpressure

If a sub-task can’t keep up with processing records at the same pace they are received, the input queue fills up. When the input queue is full, the upstream sub-task stops sending data.

Data accumulates in the sender’s output queue. When this is also full, the sender sub-task stops processing records, accumulating received data in its own input queue, and the effects propagates upstream.

This is the backpressure that Apache Flink uses to control the internal flow, preventing slow operators from being overwhelmed by slowing down the upstream flow. Backpressure is a safety mechanism to maximize the application throughput. It can be temporary, in case of an unexpected peak of ingested data, for example. If not temporary, it is usually the symptom—not the cause—that the application is not designed correctly or it has insufficient resources to process the workload.

Full Network Buffer Queue generates backpressure

In-flight buffering and checkpoint barriers

As checkpoint barriers flow with normal records, they also flow in the network buffers, through the input and output queues. In normal conditions, barriers don’t overtake records, and they are never overtaken. If records are queueing up due to backpressure, checkpoint barriers are also stuck in the queue, taking longer time to propagate from the sources to the sinks, delaying the completion of the checkpoint.

In the second part of this series, we will see how unaligned checkpoints can let barriers overtake records under specific conditions. For now, let’s see how we can optimize the size of input and output queues with buffer debloating.

Buffer debloating to optimize in-flight data

The default network buffer queue size is a good compromise for most applications. You can modify this size, but it applies to all sub-tasks, and it may be difficult to optimize this one-size-fits-all across different operators.

Longer queues support bigger throughout, but they may slow down checkpoint barriers that have to go through longer queues, causing longer End to End Checkpoint Duration. Ideally, the amount of in-flight data should be adjusted based on the actual throughput.

In version 1.14, Apache Flink introduced buffer debloating, which can be enabled to adjust in-flight data of each sub-task, based on the current throughput the sub-task is processing, and periodically reassess and readjust it.

How buffer debloating helps your application

Consider a streaming application, ingesting records from a streaming source and publishing the results to a streaming destination after some transformations. Under normal conditions, the application is sized to process the incoming throughput smoothly. Our destination has limited capacity, for example a Kafka topic throttled via quotas, sufficient to handle the normal throughput, with some margin.

In-flight data buffering under normal throughput

Imagine that the ingestion throughput has occasional peaks. These peaks exceed the limits of the streaming destination (throughput quota of the Kafka topic), which starts throttling.

Full in-flight data buffer to the sink backpressure the preceding operator

Because the sink can’t process the full throughput, in-flight data accumulates upstream of the sink, causing backpressure on the upstream operator. The effect eventually propagates up to the source, and the source starts lagging behind the most recent record in the source stream.

Backpressure propagates upstream, up to the source operator

As long this is a temporary condition, backpressure and lagging are not a problem per se, as long as the application is able to catch up when the peak has finished.

Unfortunately, accumulating in-flight data also slows down the propagation of the checkpoint barriers. Checkpoint End to End Duration goes up, and checkpoints may eventually time out.

Full in-flight data buffers slow down checkpoint barrier propagation, under backpressure

The situation is even worse if the sink uses two-phase commit for exactly-once guarantees. For example, KafkaSink uses Kafka transactions committed on checkpoints. If checkpoints become too slow, transactions are committed later, significantly increasing the latency of any downstream consumer using a read-committed isolation level.

Slow checkpoints under backpressure may also cause a vicious cycle. A slowed-down application eventually crashes, and recovers from the last checkpoint that is quite old. This causes a long reprocessing that, in turn, induces more backpressure and even slower checkpoints.

In this case, buffer debloating can help by adjusting the amount of in-flight data based on the throughput each sub-task is actually processing. When a sub-task is throttled by backpressure, the amount of in-flight data is reduced, also reducing the time checkpoint barriers take to go through all operators. Checkpoint End to End Duration goes down, and checkpoints do not time out.

Buffer debloating internals

Buffer debloating estimates the throughput a sub-task is capable of processing, assuming no idling, and limits the upstream in-flight data buffers to contain just enough data to be processed in 1 second (by default).

For efficiency, network buffers in the queues are fixed. Buffer debloating caps the usable size of each network buffer, making it smaller when the sub-task is processing slowly.

Buffer debloating speed up barrier propagation, reducing the volume of in-flight data

The benefits of less in-flight data depends on whether Apache Flink is using standard checkpoint alignment, the default behavior described so far, or unaligned checkpoints. We will examine unaligned checkpoints in the second part of this series, but let’s see the effect of buffer debloating, briefly.

  • With aligned checkpoints (default behavior) – Less in-flight data makes checkpoint barrier propagation faster, ultimately reducing the end-to-end checkpoint duration but also making it more predictable
  • With unaligned checkpoints (optional) – Less in-flight data reduces the amount of in-flight records stored with the checkpoint, ultimately reducing the checkpoint size

What buffer debloating does not do

Note that the problem we are trying to solve is slow checkpointing (or excessive checkpointing size, with unaligned checkpoints). Buffer debloating helps making checkpointing faster.

Buffer debloating does not remove backpressure. Backpressure is the internal protective mechanism that Apache Flink uses when some part of the application is not able to cope with the incoming throughput. To reduce backpressure, you have to work on other aspects of the application. When backpressure is only temporary, for example under peak conditions, the only way of removing it would be sizing the end-to-end system for the peak, rather than normal workload. But this could be impossible or too expensive.

Buffer debloating helps reduce and keep checkpoint duration stable under exceptional and temporary conditions. If an application experiences backpressure under its normal workload, or checkpoints are too slow under normal conditions, you should investigate the implementation of your application to understand the root cause.

When the automatic throughput prediction fails

Buffer debloating doesn’t have any particular drawback, but in corner cases, the mechanism may incorrectly estimate the throughput, and the resulting amount of in-flight data may not be optimal.

Estimating the throughput is complex when an operator receives data from multiple upstream operators, connected streams or unions, with very different throughput. It may also take time to adjust to a sudden spike, causing a temporary suboptimal buffering.

  • Too small in-flight data may reduce the throughput the sub-task can process (it will be idling), causing more backpressure upstream
  • Too large buffers may slow down checkpointing and increase the checkpoint size (with unaligned checkpoints)

Conclusion

The checkpointing mechanism makes Apache Flink fault tolerant, providing exactly-once state consistency. In-flight data buffering and backpressure control the data flow within the distributed streaming application maximize the throughput. Apache Flink default behaviors and configurations are good for most workloads.

The effectiveness of buffer debloating depends on the characteristics of the workload and the application. The general recommendation is to test the functionality in a non-production environment with a realistic workload to verify it actually helps with your use case.

You can request to enable buffer debloating on your Amazon Managed Service for Apache Flink application.

Under particular conditions, the combined effect of backpressure and in-flight data buffering may slow down checkpointing, increase checkpointing size (with unaligned checkpoints), and even cause checkpoints to fail. In these cases, enabling unaligned checkpointing may help reduce checkpoint duration or size.

In the second part of this series, we will understand better unaligned checkpoints and how they can help your application checkpointing efficiently in presence of backpressure, especially in combination with buffer debloating.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.

Outcomes

Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.

Conclusion

In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.


About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Access accounts with AWS Management Console Private Access

Post Syndicated from Suresh Samuel original https://aws.amazon.com/blogs/security/access-accounts-with-aws-management-console-private-access/

AWS Management Console Private Access is an advanced security feature to help you control access to the AWS Management Console. In this post, I will show you how this feature works, share current limitations, and provide AWS CloudFormation templates that you can use to automate the deployment. AWS Management Console Private Access is useful when you want to restrict users from signing in to unknown AWS accounts from within your network. With this feature, you can limit access to the console only to a specified set of known accounts when the traffic originates from within your network.

For enterprise customers, users typically access the console from devices that are connected to a corporate network, either directly or through a virtual private network (VPN). With network connectivity to the console, users can authenticate into an account with valid credentials, including third-party accounts and personal accounts. For enterprise customers with stringent network access controls, this feature provides a way to control which accounts can be accessed from on-premises networks.

How AWS Management Console Private Access works

AWS PrivateLink now supports the AWS Management Console, which means that you can create Virtual Private Cloud (VPC) endpoints in your VPC for the console. You can then use DNS forwarding to conditionally route users’ browser traffic to the VPC endpoints from on-premises and define endpoint policies that allow or deny access to specific accounts, organizations, or organizational units (OUs). To privately reach the endpoints, you must have a hybrid network connection between on-premises and AWS over AWS Direct Connect or AWS Site-to-Site VPN.

When you conditionally forward DNS queries for the zone aws.amazon.com from on-premises to an Amazon Route 53 Resolver inbound endpoint within the VPC, Route 53 will prefer the private hosted zone for aws.amazon.com to resolve the queries. The private hosted zone makes it simple to centrally manage records for the console in the AWS US East (N. Virginia) Region (us-east-1) as well as other Regions.

Configure a VPC endpoint for the console

To configure VPC endpoints for the console, you must complete the following steps:

  1. Create interface VPC endpoints in a VPC in the US East (N. Virginia) Region for the console and sign-in services. Repeat for other desired Regions. You must create VPC endpoints in the US East (N. Virginia) Region because the default DNS name for the console resolves to this Region. Specify the accounts, organizations, or OUs that should be allowed or denied in the endpoint policies. For instructions on how to create interface VPC endpoints, see Access an AWS service using an interface VPC endpoint.
  2. Create a Route 53 Resolver inbound endpoint in a VPC and note the IP addresses for the elastic network interfaces of the endpoint. Forward DNS queries for the console from on-premises to these IP addresses. For instructions on how to configure Route 53 Resolver, see Getting started with Route 53 Resolver.
  3. Create a Route 53 private hosted zone with records for the console and sign-in subdomains. For the full list of records needed, see DNS configuration for AWS Management Console and AWS Sign-In. Then associate the private hosted zone with the same VPC that has the Resolver inbound endpoint. For instructions on how to create a private hosted zone, see Creating a private hosted zone.
  4. Conditionally forward DNS queries for aws.amazon.com to the IP addresses of the Resolver inbound endpoint.

How to access Regions other than US East (N. Virginia)

To access the console for another supported Region using AWS Management Console Private Access, complete the following steps:

  1. Create the console and sign-in VPC endpoints in a VPC in that Region.
  2. Create resource records for <region>.console.aws.amazon.com and <region>.signin.aws.amazon.com in the private hosted zone, with values that target the respective VPC endpoints in that Region. Replace <region> with the region code (for example, us-west-2).

For increased resiliency, you can also configure a second Resolver inbound endpoint in a different Region other than the US East (N. Virginia) Region (us-east-1). On-premises DNS resolvers can use both endpoints for resilient DNS resolution to the private hosted zone.

Automate deployment of AWS Management Console Private Access

I created an AWS CloudFormation template that you can use to deploy the required resources in the US East (N. Virginia) Region (us-east-1). To get the template, go to console-endpoint-use1.yaml. The CloudFormation stack deploys the required VPC endpoints, Route 53 Resolver inbound endpoint, and private hosted zone with required records.

Note: The default endpoint policy allows all accounts. For sample policies with conditions to restrict access, see Allow AWS Management Console use for expected accounts and organizations only (trusted identities).

I also created a CloudFormation template that you can use to deploy the required resources in other Regions where private access to the console is required. To get the template, go to console-endpoint-non-use1.yaml.

Cost considerations

When you configure AWS Management Console Private Access, you will incur charges. You can use the following information to estimate these charges:

  • PrivateLink pricing is based on the number of hours that the VPC endpoints remain provisioned. In the US East (N. Virginia) Region, this is $0.01 per VPC endpoint per Availability Zone ($/hour).
  • Data processing charges per gigabyte (GB) of data processed through the VPC endpoints is $0.01 in the US East (N. Virginia) Region.
  • The Route 53 Resolver inbound endpoint is charged per IP (elastic network interface) per hour. In the US East (N. Virginia) Region, this is $0.125 per IP address per hour. See Route 53 pricing.
  • DNS queries to the inbound endpoint are charged at $0.40 per million queries.
  • The Route 53 hosted zone is charged at $0.50 per hosted zone per month. To allow testing, AWS won’t charge you for a hosted zone that you delete within 12 hours of creation.

Based on this pricing model, the cost of configuring AWS Management Console Private Access in the US East (N. Virginia) Region in two Availability Zones is approximately $212.20 per month for the deployed resources. DNS queries and data processing charges are additional based on actual usage. You can also apply this pricing model to help estimate the cost to configure in additional supported Regions. Route 53 is a global service, so you only have to create the private hosted zone once along with the resources in the US East (N. Virginia) Region.

Limitations and considerations

Before you get started with AWS Management Console Private Access, make sure to review the following limitations and considerations:

  • For a list of supported Regions and services, see Supported AWS Regions, service consoles, and features.
  • You can use this feature to restrict access to specific accounts from customer networks by forwarding DNS queries to the VPC endpoints. This feature doesn’t prevent users from accessing the console directly from the internet by using the console’s public endpoints from devices that aren’t on the corporate network.
  • The following subdomains aren’t currently supported by this feature and won’t be accessible through private access:
    • docs.aws.amazon.com
    • health.aws.amazon.com
    • status.aws.amazon.com
  • After a user completes authentication and accesses the console with private access, when they navigate to an individual service console, for example Amazon Elastic Compute Cloud (Amazon EC2), they must have network connectivity to the service’s API endpoint, such as ec2.amazonaws.com. This is needed for the console to make API calls such as ec2:DescribeInstances to display resource details in the service console.

Conclusion

In this blog post, I outlined how you can configure the console through AWS Management Console Private Access to restrict access to AWS accounts from on-premises, how the feature works, and how to configure it for multiple Regions. I also provided CloudFormation templates that you can use to automate the configuration of this feature. Finally, I shared information on costs and some limitations that you should consider before you configure private access to the console.

For more information about how to set up and test AWS Management Console Private Access and reference architectures, see Try AWS Management Console Private Access. For the latest CloudFormation templates, see the aws-management-console-private-access-automation GitHub repository.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread at re:Post.

Want more AWS Security news? Follow us on Twitter.

Suresh Samuel

Suresh Samuel

Suresh is a Senior Technical Account Manager at AWS. He helps customers in the financial services industry with their operations on AWS. When not working, he can be found photographing birds in Texas or hanging out with his kids.