Tag Archives: Amazon EMR

Get a quick start with Apache Hudi, Apache Iceberg, and Delta Lake with Amazon EMR on EKS

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/get-a-quick-start-with-apache-hudi-apache-iceberg-and-delta-lake-with-amazon-emr-on-eks/

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can keep your data as is in your object store or file-based storage without having to first structure the data. Additionally, you can run different types of analytics against your loosely formatted data lake—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML) to guide better decisions. Due to the flexibility and cost effectiveness that a data lake offers, it’s very popular with customers who are looking to implement data analytics and AI/ML use cases.

Due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake. Another challenge is making concurrent changes to the data lake. Implementing these tasks is time consuming and costly.

In this post, we explore three open-source transactional file formats: Apache Hudi, Apache Iceberg, and Delta Lake to help us to overcome these data lake challenges. We focus on how to get started with these data storage frameworks via real-world use case. As an example, we demonstrate how to handle incremental data change in a data lake by implementing a Slowly Changing Dimension Type 2 solution (SCD2) with Hudi, Iceberg, and Delta Lake, then deploy the applications with Amazon EMR on EKS.

ACID challenge in data lakes

In analytics, the data lake plays an important role as an immutable and agile data storage layer. Unlike traditional data warehouses or data mart implementations, we make no assumptions on the data schema in a data lake and can define whatever schemas required by our use cases. It’s up to the downstream consumption layer to make sense of that data for their own purposes.

One of the most common challenges is supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions in a data lake. For example, how do we run queries that return consistent and up-to-date results while new data is continuously being ingested or existing data is being modified?

Let’s try to understand the data problem with a real-world scenario. Assume we centralize customer contact datasets from multiple sources to an Amazon Simple Storage Service (Amazon S3)-backed data lake, and we want to keep all the historical records for analysis and reporting. We face the following challenges:

  • We keep creating append-only files in Amazon S3 to track the contact data changes (insert, update, delete) in near-real time.
  • Consistency and atomicity aren’t guaranteed because we just dump data files from multiple sources without knowing whether the entire operation is successful or not.
  • We don’t have an isolation guarantee whenever multiple workloads are simultaneously reading and writing to the same target contact table.
  • We track every single activity at source, including duplicates caused by the retry mechanism and accidental data changes that are then reverted. This leads to the creation of a large volume of append-only files. The performance of extract, transform, and load (ETL) jobs decreases as all the data files are read each time.
  • We have to shorten the file retention period to reduce the data scan and read performance.

In this post, we walk through a simple SCD2 ETL example designed for solving the ACID transaction problem with the help of Hudi, Iceberg, and Delta Lake. We also show how to deploy the ACID solution with EMR on EKS and query the results by Amazon Athena.

Custom library dependencies with EMR on EKS

By default, Hudi and Iceberg are supported by Amazon EMR as out-of-the-box features. For this demonstration, we use EMR on EKS release 6.8.0, which contains Apache Iceberg 0.14.0-amzn-0 and Apache Hudi 0.11.1-amzn-0. To find out the latest and past versions that Amazon EMR supports, check out the Hudi release history and the Iceberg release history tables. The runtime binary files of these frameworks can be found in the Spark’s class path location within each EMR on EKS image. See Amazon EMR on EKS release versions for the list of supported versions and applications.

As of this writing, Amazon EMR does not include Delta Lake by default. There are two ways to make it available in EMR on EKS:

  • At the application level – You install Delta libraries by setting a Spark configuration spark.jars or --jars command-line argument in your submission script. The JAR files will be downloaded and distributed to each Spark Executor and Driver pod when starting a job.
  • At the Docker container level – You can customize an EMR on EKS image by packaging Delta dependencies into a single Docker container that promotes portability and simplifies dependency management for each workload

Other custom library dependencies can be managed the same way as for Delta Lake—passing a comma-separated list of JAR files in the Spark configuration at job submission, or packaging all the required libraries into a Docker image.

Solution overview

The solution provides two sample CSV files as the data source: initial_contact.csv and update_contacts.csv. They were generated by a Python script with the Faker package. For more details, check out the tutorial on GitHub.

The following diagram describes a high-level architecture of the solution and different services being used.

The workflow steps are as follows:

  1. Ingest the first CSV file from a source S3 bucket. The data is being processed by running a Spark ETL job with EMR on EKS. The application contains either the Hudi, Iceberg, or Delta framework.
  2. Store the initial table in Hudi, Iceberg, or Delta file format in a target S3 bucket (curated). We use the AWS Glue Data Catalog as the hive metastore. Optionally, you can configure Amazon DynamoDB as a lock manager for the concurrency controls.
  3. Ingest a second CSV file that contains new records and some changes to the existing ones.
  4. Perform SCD2 via Hudi, Iceberg, or Delta in the Spark ETL job.
  5. Query the Hudi, Iceberg, or Delta table stored on the target S3 bucket in Athena

To simplify the demo, we have accommodated steps 1–4 into a single Spark application.

Prerequisites

Install the following tools:

curl -fsSL -o get_helm.sh \
https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3

chmod 700 get_helm.sh
export DESIRED_VERSION=v3.8.2
./get_helm.sh 
helm version

For a quick start, you can use AWS CloudShell which includes the AWS CLI and kubectl already.

Clone the project

Download the sample project either to your computer or the CloudShell console:

git clone https://github.com/aws-samples/emr-on-eks-hudi-iceberg-delta
cd emr-on-eks-hudi-iceberg-delta

Set up the environment

Run the following blog_provision.sh script to set up a test environment. The infrastructure deployment includes the following resources:

  • A new S3 bucket to store sample data and job code.
  • An Amazon Elastic Kubernetes Service (Amazon EKS) cluster (version 1.21) in a new VPC across two Availability Zones.
  • An EMR virtual cluster in the same VPC, registered to the emr namespace in Amazon EKS.
  • An AWS Identity and Access Management (IAM) job execution role contains DynamoDB access, because we use DynamoDB to provide concurrency controls that ensure atomic transaction with the Hudi and Iceberg tables.
export AWS_REGION=us-east-1
export EKSCLUSTER_NAME=eks-quickstart
./blog_provision.sh
# Upload sample contact data to S3
export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
aws s3 sync data s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/data

Job execution role

The provisioning includes an IAM job execution role called emr-on-eks-quickstart-execution-role that allows your EMR on EKS jobs access to the required AWS services. It contains AWS Glue permissions because we use the Data Catalog as our metastore.

See the following code:

 {
    "Effect": "Allow",
    "Action": ["glue:Get*","glue:BatchCreatePartition","glue:UpdateTable","glue:CreateTable"],
    "Resource": [
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:catalog",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:database/*",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:table/*"
    ]
}

Additionally, the role contains DynamoDB permissions, because we use the service as the lock manager. It provides concurrency controls that ensure atomic transaction with our Hudi and Iceberg tables. If a DynamoDB table with the given name doesn’t exist, a new table is created with the billing mode set as pay-per-request. More details can be found in the following framework examples.

{
    "Sid": "DDBLockManager",
    "Effect": "Allow",
    "Action": [
        "dynamodb:DescribeTable",
        "dynamodb:CreateTable",
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:BatchWriteItem",
        "dynamodb:GetItem",
        "dynamodb:BatchGetItem"
    ],
    "Resource": [
       "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable/index/*",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable"
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable/index/*",
    ]
}

Example 1: Run Apache Hudi with EMR on EKS

The following steps provide a quick start for you to implement SCD Type 2 data processing with the Hudi framework. To learn more, refer to Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR.

The following code snippet demonstrates the SCD type2 implementation logic. It creates Hudi tables in a default database in the Glue Data Catalog. The full version is in the script hudi_scd_script.py.

# Read incremental contact CSV file with extra SCD columns
delta_csv_df = spark.read.schema(contact_schema).format("csv")\
.load(f"s3://{S3_BUCKET_NAME}/.../update_contacts.csv")\
.withColumn("ts", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_from", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_to", lit("").cast(TimestampType()))\
.withColumn("checksum",md5(concat(col("name"),col("email"),col("state"))))\
.withColumn('iscurrent', lit(1).cast("int"))

## Find existing records to be expired
join_cond = [initial_hudi_df.checksum != delta_csv_df.checksum,
             initial_hudi_df.id == delta_csv_df.id,
             initial_hudi_df.iscurrent == 1]
contact_to_update_df = (initial_hudi_df.join(delta_csv_df, join_cond)
                      .select(initial_hudi_df.id,
                                ....
                              initial_hudi_df.valid_from,
                              delta_csv_df.valid_from.alias('valid_to'),
                              initial_hudi_df.checksum
                              )
                      .withColumn('iscurrent', lit(0).cast("int"))
                      )
                      
merged_contact_df = delta_csv_df.unionByName(contact_to_update_df)

# Upsert
merged_contact_df.write.format('org.apache.hudi')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .options(**hudiOptions) \
                    .mode('append')\
                    .save(TABLE_LOCATION)

In the job script, the hudiOptions were set to use the AWS Glue Data Catalog and enable the DynamoDB-based Optimistic Concurrency Control (OCC). For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

hudiOptions = {
    ....
    # sync to Glue catalog
    "hoodie.datasource.hive_sync.mode":"hms",
    ....
    # DynamoDB based locking mechanisms
    "hoodie.write.concurrency.mode":"optimistic_concurrency_control", #default is SINGLE_WRITER
    "hoodie.cleaner.policy.failed.writes":"LAZY", #Hudi will delete any files written by failed writes to re-claim space
    "hoodie.write.lock.provider":"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
    "hoodie.write.lock.dynamodb.table":"myHudiLockTable",
    "hoodie.write.lock.dynamodb.partition_key":"tablename",
    "hoodie.write.lock.dynamodb.region": REGION,
    "hoodie.write.lock.dynamodb.endpoint_url": f"dynamodb.{REGION}.amazonaws.com"
}
  1. Upload the job scripts to Amazon S3:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync hudi/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit Hudi jobs with EMR on EKS to create SCD2 tables:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./hudi/hudi_submit_cow.sh
    ./hudi/hudi_submit_mor.sh

    Hudi supports two tables types: Copy on Write (CoW) and Merge on Read (MoR). The following is the code snippet to create a CoW table. For the complete job scripts for each table type, refer to hudi_submit_cow.sh and hudi_submit_mor.sh.

    aws emr-containers start-job-run \
      --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
      --name em68-hudi-cow \
      --execution-role-arn $EMR_ROLE_ARN \
      --release-label emr-6.8.0-latest \
      --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://'$S3BUCKET'/blog/hudi_scd_script.py",
          "entryPointArguments":["'$AWS_REGION'","'$S3BUCKET'","COW"],
          "sparkSubmitParameters": "--jars local:///usr/lib/hudi/hudi-spark-bundle.jar,local:///usr/lib/spark/external/lib/spark-avro.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
      --configuration-overrides '{
        "applicationConfiguration": [
          {
            "classification": "spark-defaults", 
            "properties": {
              "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
              "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
             }}
        ]}'

  3. Check the job status on the EMR virtual cluster console.
  4. Query the output in Athena:

    select * from hudi_contact_cow where id=103

    select * from hudi_contact_mor_rt where id=103

Example 2: Run Apache Iceberg with EMR on EKS

Starting with Amazon EMR version 6.6.0, you can use Apache Spark 3 on EMR on EKS with the Iceberg table format. For more information on how Iceberg works in an immutable data lake, see Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR.

The sample job creates an Iceberg table iceberg_contact in the default database of AWS Glue. The full version is in the iceberg_scd_script.py script. The following code snippet shows the SCD2 type of MERGE operation:

# Read incremental CSV file with extra SCD2 columns
spark.read.schema(contact_schema)\
.format("csv").options(header=False,delimiter=",")\
.load(f"s3://{S3_BUCKET_NAME}/blog/data/update_contacts.csv")\
.withColumn(……)\
.createOrReplaceTempView('iceberg_contact_update')

# Update existing records which are changed in the update file
contact_update_qry = """
    WITH contact_to_update AS (
          SELECT target.*
          FROM glue_catalog.default.iceberg_contact AS target
          JOIN iceberg_contact_update AS source 
          ON target.id = source.id
          WHERE target.checksum != source.checksum
            AND target.iscurrent = 1
        UNION
          SELECT * FROM iceberg_contact_update
    ),contact_updated AS (
        SELECT *, LEAD(valid_from) OVER (PARTITION BY id ORDER BY valid_from) AS eff_from
        FROM contact_to_update
    )
    SELECT id,name,email,state,ts,valid_from,
        CAST(COALESCE(eff_from, null) AS Timestamp) AS valid_to,
        CASE WHEN eff_from IS NULL THEN 1 ELSE 0 END AS iscurrent,
        checksum
    FROM contact_updated
"""
# Upsert
spark.sql(f"""
    MERGE INTO glue_catalog.default.iceberg_contact tgt
    USING ({contact_update_qry}) src
    ON tgt.id = src.id
    AND tgt.checksum = src.checksum
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

As demonstrated earlier when discussing the job execution role, the role emr-on-eks-quickstart-execution-role granted access to the required DynamoDB table myIcebergLockTable, because the table is used to obtain locks on Iceberg tables, in case of multiple concurrent write operations against a single table. For more information on Iceberg’s lock manager, refer to DynamoDB Lock Manager.

  1. Upload the application scripts to the example S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync iceberg/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS to create an SCD2 Iceberg table:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./iceberg/iceberg_submit.sh

    The full version code is in the iceberg_submit.sh script. The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-iceberg \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "s3://'$S3BUCKET'/blog/iceberg_scd_script.py",
        "entryPointArguments":["'$S3BUCKET'"],
        "sparkSubmitParameters": "--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
        "applicationConfiguration": [
        {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.glue_catalog.warehouse": "s3://'$S3BUCKET'/iceberg/",
        "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.glue_catalog.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
        "spark.sql.catalog.glue_catalog.lock.table": "myIcebergLockTable"
        }}
    ]}'

  3. Check the job status on the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from iceberg_contact where id=103

Example 3: Run open-source Delta Lake with EMR on EKS

Delta Lake 2.1.x is compatible with Apache Spark 3.3.x. Check out the compatibility list for other versions of Delta Lake and Spark. In this post, we use Amazon EMR release 6.8 (Spark 3.3.0) to demonstrate the SCD2 implementation in a data lake.

The following is the Delta code snippet to load initial dataset; the incremental load MERGE logic is highly similar to the Iceberg example. As a one-off task, there should be two tables set up on the same data:

  • The Delta table delta_table_contact – Defined on the TABLE_LOCATION at ‘s3://{S3_BUCKET_NAME}/delta/delta_contact’. The MERGE/UPSERT operation must be implemented on the Delta destination table. Athena can’t query this table directly, instead it reads from a manifest file stored in the same location, which is a text file containing a list of data files to read for querying a table. It is described as an Athena table below.
  • The Athena table delta_contact – Defined on the manifest location s3://{S3_BUCKET_NAME}/delta/delta_contact/_symlink_format_manifest/. All read operations from Athena must use this table.

The full version code is in the delta_scd_script.py  script. The code snippet is as follows:

# Read initial contact CSV file and create a Delta table with extra SCD2 columns
df_intial_csv = spark.read.schema(contact_schema)\
 .format("csv")\
 .options(header=False,delimiter=",")\
 .load(f"s3://{S3_BUCKET_NAME}/.../initial_contacts.csv")\
 .withColumn(.........)\
 .write.format("delta")\
 .mode("overwrite")\
 .save(TABLE_LOCATION)

spark.sql(f"""CREATE TABLE IF NOT EXISTS delta_table_contact USING DELTA LOCATION '{TABLE_LOCATION}'""")
spark.sql("GENERATE symlink_format_manifest FOR TABLE delta_table_contact")
spark.sql("ALTER TABLE delta_table_contact SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)")

# Create a queriable table in Athena
spark.sql(f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS default.delta_contact (
     ....
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{TABLE_LOCATION}/_symlink_format_manifest/'""")

The SQL statement GENERATE symlink_format_manifest FOR TABLE ... is a required step to set up the Athena for Delta Lake. Whenever the data in a Delta table is updated, you must regenerate the manifests. Therefore, we use ALTER TABLE .... SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true) to automate the manifest refresh as a one-off setup.

  1. Upload the Delta sample scripts to the S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync delta/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    ./delta/delta_submit.sh

    The full version code is in the delta_submit.sh script. The open-source Delta JAR files must be included in the spark.jars. Alternatively, follow the instructions in How to customize Docker images and build a custom EMR on EKS image to accommodate the Delta dependencies.

    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.0.0/delta-core_2.12-2.0.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.0.0/delta-storage-2.0.0.jar"

    The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-delta \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
       "sparkSubmitJobDriver": {
       "entryPoint": "s3://'$S3BUCKET'/blog/delta_scd_script.py",
       "entryPointArguments":["'$S3BUCKET'"],
       "sparkSubmitParameters": "--conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
       "applicationConfiguration": [
       {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
    “spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.1.0/delta-storage-2.1.0.jar"
        }}
    ]}‘

  3. Check the job status from the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from delta_contact where id=103

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script (change the Region if necessary):

export EMRCLUSTER_NAME=emr-on-eks-quickstart
export AWS_REGION=us-east-1
./clean_up.sh

Conclusion

Implementing an ACID-compliant data lake with EMR on EKS enables you focus more on delivering business value, instead of worrying about managing complexities and reliabilities at the data storage layer.

This post presented three different transactional storage frameworks that can meet your ACID needs. They ensure you never read partial data (Atomicity). The read/write isolation allows you to see consistent snapshots of the data, even if an update occurs at the same time (Consistency and Isolation). All the transactions are stored directly to the underlying Amazon S3-backed data lake, which is designed for 11 9’s of durability (Durability).

For more information, check out the sample GitHub repository used in this post and the EMR on EKS Workshop. They will get you started with running your familiar transactional framework with EMR on EKS. If you want dive deep into each storage format, check out the following posts:


About the authors

Amir Shenavandeh is a Sr Analytics Specialist Solutions Architect and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimisation. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Amit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Run a data processing job on Amazon EMR Serverless with AWS Step Functions

Post Syndicated from Siva Ramani original https://aws.amazon.com/blogs/big-data/run-a-data-processing-job-on-amazon-emr-serverless-with-aws-step-functions/

There are several infrastructure as code (IaC) frameworks available today, to help you define your infrastructure, such as the AWS Cloud Development Kit (AWS CDK) or Terraform by HashiCorp. Terraform, an AWS Partner Network (APN) Advanced Technology Partner and member of the AWS DevOps Competency, is an IaC tool similar to AWS CloudFormation that allows you to create, update, and version your AWS infrastructure. Terraform provides friendly syntax (similar to AWS CloudFormation) along with other features like planning (visibility to see the changes before they actually happen), graphing, and the ability to create templates to break infrastructure configurations into smaller chunks, which allows better maintenance and reusability. We use the capabilities and features of Terraform to build an API-based ingestion process into AWS. Let’s get started!

In this post, we showcase how to build and orchestrate a Scala Spark application using Amazon EMR Serverless, AWS Step Functions, and Terraform. In this end-to-end solution, we run a Spark job on EMR Serverless that processes sample clickstream data in an Amazon Simple Storage Service (Amazon S3) bucket and stores the aggregation results in Amazon S3.

With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications. You will continue to get the benefits of Amazon EMR, such as open source compatibility, concurrency, and optimized runtime performance for popular data frameworks. EMR Serverless is suitable for customers who want ease in operating applications using open-source frameworks. It offers quick job startup, automatic capacity management, and straightforward cost controls.

Solution overview

We provide the Terraform infrastructure definition and the source code for an AWS Lambda function using sample customer user clicks for online website inputs, which are ingested into an Amazon Kinesis Data Firehose delivery stream. The solution uses Kinesis Data Firehose to convert the incoming data into a Parquet file (an open-source file format for Hadoop) before pushing it to Amazon S3 using the AWS Glue Data Catalog. The generated output S3 Parquet file logs are then processed by an EMR Serverless process, which outputs a report detailing aggregate clickstream statistics in an S3 bucket. The EMR Serverless operation is triggered using Step Functions. The sample architecture and code are spun up as shown in the following diagram.

emr serverless application

The provided samples have the source code for building the infrastructure using Terraform for running the Amazon EMR application. Setup scripts are provided to create the sample ingestion using Lambda for the incoming application logs. For a similar ingestion pattern sample, refer to Provision AWS infrastructure using Terraform (By HashiCorp): an example of web application logging customer data.

The following are the high-level steps and AWS services used in this solution:

  • The provided application code is packaged and built using Apache Maven.
  • Terraform commands are used to deploy the infrastructure in AWS.
  • The EMR Serverless application provides the option to submit a Spark job.
  • The solution uses two Lambda functions:
    • Ingestion – This function processes the incoming request and pushes the data into the Kinesis Data Firehose delivery stream.
    • EMR Start Job – This function starts the EMR Serverless application. The EMR job process converts the ingested user click logs into output in another S3 bucket.
  • Step Functions triggers the EMR Start Job Lambda function, which submits the application to EMR Serverless for processing of the ingested log files.
  • The solution uses four S3 buckets:
    • Kinesis Data Firehose delivery bucket – Stores the ingested application logs in Parquet file format.
    • Loggregator source bucket – Stores the Scala code and JAR for running the EMR job.
    • Loggregator output bucket – Stores the EMR processed output.
    • EMR Serverless logs bucket – Stores the EMR process application logs.
  • Sample invoke commands (run as part of the initial setup process) insert the data using the ingestion Lambda function. The Kinesis Data Firehose delivery stream converts the incoming stream into a Parquet file and stores it in an S3 bucket.

For this solution, we made the following design decisions:

  • We use Step Functions and Lambda in this use case to trigger the EMR Serverless application. In a real-world use case, the data processing application could be long running and may exceed Lambda’s timeout limits. In this case, you can use tools like Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Amazon MWAA is a managed orchestration service makes it easier to set up and operate end-to-end data pipelines in the cloud at scale.
  • The Lambda code and EMR Serverless log aggregation code are developed using Java and Scala, respectively. You can use any supported languages in these use cases.
  • The AWS Command Line Interface (AWS CLI) V2 is required for querying EMR Serverless applications from the command line. You can also view these from the AWS Management Console. We provide a sample AWS CLI command to test the solution later in this post.

Prerequisites

To use this solution, you must complete the following prerequisites:

  • Install the AWS CLI. For this post, we used version 2.7.18. This is required in order to query the aws emr-serverless AWS CLI commands from your local machine. Optionally, all the AWS services used in this post can be viewed and operated via the console.
  • Make sure to have Java installed, and JDK/JRE 8 is set in the environment path of your machine. For instructions, see the Java Development Kit.
  • Install Apache Maven. The Java Lambda functions are built using mvn packages and are deployed using Terraform into AWS.
  • Install the Scala Build Tool. For this post, we used version 1.4.7. Make sure to download and install based on your operating system needs.
  • Set up Terraform. For steps, see Terraform downloads. We use version 1.2.5 for this post.
  • Have an AWS account.

Configure the solution

To spin up the infrastructure and the application, complete the following steps:

  1. Clone the following GitHub repository.
    The provided exec.sh shell script builds the Java application JAR (for the Lambda ingestion function) and the Scala application JAR (for the EMR processing) and deploys the AWS infrastructure that is needed for this use case.
  2. Run the following commands:
    $ chmod +x exec.sh
    $ ./exec.sh

    To run the commands individually, set the application deployment Region and account number, as shown in the following example:

    $ APP_DIR=$PWD
    $ APP_PREFIX=clicklogger
    $ STAGE_NAME=dev
    $ REGION=us-east-1
    $ ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account')

    The following is the Maven build Lambda application JAR and Scala application package:

    $ cd $APP_DIR/source/clicklogger
    $ mvn clean package
    $ sbt reload
    $ sbt compile
    $ sbt package

  3. Deploy the AWS infrastructure using Terraform:
    $ terraform init
    $ terraform plan
    $ terraform apply --auto-approve

Test the solution

After you build and deploy the application, you can insert sample data for Amazon EMR processing. We use the following code as an example. The exec.sh script has multiple sample insertions for Lambda. The ingested logs are used by the EMR Serverless application job.

The sample AWS CLI invoke command inserts sample data for the application logs:

aws lambda invoke --function-name clicklogger-dev-ingestion-lambda —cli-binary-format raw-in-base64-out —payload '{"requestid":"OAP-guid-001","contextid":"OAP-ctxt-001","callerid":"OrderingApplication","component":"login","action":"load","type":"webpage"}' out

To validate the deployments, complete the following steps:

  1. On the Amazon S3 console, navigate to the bucket created as part of the infrastructure setup.
  2. Choose the bucket to view the files.
    You should see that data from the ingested stream was converted into a Parquet file.
  3. Choose the file to view the data.
    The following screenshot shows an example of our bucket contents.
    Now you can run Step Functions to validate the EMR Serverless application.
  4. On the Step Functions console, open clicklogger-dev-state-machine.
    The state machine shows the steps to run that trigger the Lambda function and EMR Serverless application, as shown in the following diagram.
  5. Run the state machine.
  6. After the state machine runs successfully, navigate to the clicklogger-dev-output-bucket on the Amazon S3 console to see the output files.
  7. Use the AWS CLI to check the deployed EMR Serverless application:
    $ aws emr-serverless list-applications \
          | jq -r '.applications[] | select(.name=="clicklogger-dev-loggregrator-emr-<Your-Account-Number>").id'

  8. On the Amazon EMR console, choose Serverless in the navigation pane.
  9. Select clicklogger-dev-studio and choose Manage applications.
  10. The Application created by the stack will be as shown below clicklogger-dev-loggregator-emr-<Your-Account-Number>
    Now you can review the EMR Serverless application output.
  11. On the Amazon S3 console, open the output bucket (us-east-1-clicklogger-dev-loggregator-output-).
    The EMR Serverless application writes the output based on the date partition, such as 2022/07/28/response.md.The following code shows an example of the file output:

    |*createdTime*|*callerid*|*component*|*count*
    |------------|-----------|-----------|-------
    *07-28-2022*|OrderingApplication|checkout|2
    *07-28-2022*|OrderingApplication|login|2
    *07-28-2022*|OrderingApplication|products|2

Clean up

The provided ./cleanup.sh script has the required steps to delete all the files from the S3 buckets that were created as part of this post. The terraform destroy command cleans up the AWS infrastructure that you created earlier. See the following code:

$ chmod +x cleanup.sh
$ ./cleanup.sh

To do the steps manually, you can also delete the resources via the AWS CLI:

# CLI Commands to delete the Amazon S3  

aws s3 rb s3://clicklogger-dev-emr-serverless-logs-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-firehose-delivery-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-output-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-source-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-source-bucket-<your-account-number> --force

# Destroy the AWS Infrastructure 
terraform destroy --auto-approve

Conclusion

In this post, we built, deployed, and ran a data processing Spark job in EMR Serverless that interacts with various AWS services. We walked through deploying a Lambda function packaged with Java using Maven, and a Scala application code for the EMR Serverless application triggered with Step Functions with infrastructure as code. You can use any combination of applicable programming languages to build your Lambda functions and EMR job application. EMR Serverless can be triggered manually, automated, or orchestrated using AWS services like Step Functions and Amazon MWAA.

We encourage you to test this example and see for yourself how this overall application design works within AWS. Then, it’s just the matter of replacing your individual code base, packaging it, and letting EMR Serverless handle the process efficiently.

If you implement this example and run into any issues, or have any questions or feedback about this post, please leave a comment!

References


About the Authors

Sivasubramanian Ramani (Siva Ramani) is a Sr Cloud Application Architect at Amazon Web Services. His expertise is in application optimization & modernization, serverless solutions and using Microsoft application workloads with AWS.

Naveen Balaraman is a Sr Cloud Application Architect at Amazon Web Services. He is passionate about Containers, serverless Applications, Architecting Microservices and helping customers leverage the power of AWS cloud.

Upgrade Amazon EMR Hive Metastore from 5.X to 6.X

Post Syndicated from Jianwei Li original https://aws.amazon.com/blogs/big-data/upgrade-amazon-emr-hive-metastore-from-5-x-to-6-x/

If you are currently running Amazon EMR 5.X clusters, consider moving to Amazon EMR 6.X as  it includes new features that helps you improve performance and optimize on cost. For instance, Apache Hive is two times faster with LLAP on Amazon EMR 6.X, and Spark 3 reduces costs by 40%. Additionally, Amazon EMR 6.x releases include Trino, a fast distributed SQL engine and Iceberg, high-performance open data format for petabyte scale tables.

To upgrade Amazon EMR clusters from 5.X to 6.X release, a Hive Metastore upgrade is the first step before applications such as Hive and Spark can be migrated. This post provides guidance on how to upgrade Amazon EMR Hive Metastore from 5.X to 6.X as well as migration of Hive Metastore to the AWS Glue Data Catalog. As Hive 3 Metastore is compatible with Hive 2 applications, you can continue to use Amazon EMR 5.X with the upgraded Hive Metastore.

Solution overview

In the following section, we provide steps to upgrade the Hive Metastore schema using MySQL as the backend.. For any other backends (such as MariaDB, Oracle, or SQL Server), update the commands accordingly.

There are two options to upgrade the Amazon EMR Hive Metastore:

  • Upgrade the Hive Metastore schema from 2.X to 3.X by using the Hive Schema Tool
  • Migrate the Hive Metastore to the AWS Glue Data Catalog

We walk through the steps for both options.

Pre-upgrade prerequisites

Before upgrading the Hive Metastore, you must complete the following prerequisites steps:

  1. Verify the Hive Metastore database is running and accessible.
    You should be able to run Hive DDL and DML queries successfully. Any errors or issues must be fixed before proceeding with upgrade process. Use the following sample queries to test the database:

    create table testtable1 (id int, name string);)
    insert into testtable1 values (1001, "user1");
    select * from testtable1;

  2. To get the Metastore schema version in the current EMR 5.X cluster, run the following command in the primary node:
    sudo hive —service schematool -dbType mysql -info

    The following code shows our sample output:

    $ sudo hive --service schematool -dbType mysql -info
    Metastore connection URL: jdbc:mysql://xxxxxxx.us-east-1.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true
    Metastore Connection Driver : org.mariadb.jdbc.Driver
    Metastore connection User: admin
    Hive distribution version: 2.3.0
    Metastore schema version:  2.3.0

  3. Stop the Metastore service and restrict access to the Metastore MySQL database.
    It’s very important that no one else accesses or modifies the contents of the Metastore database while you’re performing the schema upgrade.To stop the Metastore, use the following commands:

    $ sudo stop hive-server2
    $ sudo stop hive-hcatalog-server

    For Amazon EMR release 5.30 and 6.0 onwards (Amazon Linux 2 is the operating system for the Amazon EMR 5.30+ and 6.x release series), use the following commands:

    $ sudo systemctl stop hive-server2.service
    $ sudo systemctl stop hive-hcatalog-server.service

    You can also note the total number of databases and tables present in the Hive Metastore before the upgrade, and verify the number of databases and tables after the upgrade.

  4. To get the total number of tables and databases before the upgrade, run the following commands after connecting to the external Metastore database (assuming the Hive Metadata DB name is hive):
    $mysql -u <username> -h <mysqlhost> --password;
    $use hive;
    $select count(*) from DBS;
    $select count(*) from TBLS;

  5. Take a backup or snapshot of the Hive database.
    This allows you to revert any changes made during the upgrade process if something goes wrong. If you’re using Amazon Relational Database Service (Amazon RDS), refer to Backing up and restoring an Amazon RDS instance for instructions.
  6. Take note of the Hive table storage location if data is stored in HDFS.

If all the table data is on Amazon Simple Storage Service (Amazon S3), then no action is needed. If HDFS is used as the storage layer for Hive databases and tables, then take a note of them. You will need to copy the files on HDFS to a similar path on the new cluster, and then verify or update the location attribute for databases and tables on the new cluster accordingly.

Upgrade the Amazon EMR Hive Metastore schema with the Hive Schema Tool

In this approach, you use the persistent Hive Metastore on a remote database (Amazon RDS for MySQL or Amazon Aurora MySQL-Compatible Edition). The following diagram shows the upgrade procedure.

EMR Hive Metastore Upgrade

To upgrade the Amazon EMR Hive Metastore from 5.X (Hive version 2.X) to 6.X (Hive version 3.X), we can use the Hive Schema Tool. The Hive Schema Tool is an offline tool for Metastore schema manipulation. You can use it to initialize, upgrade, and validate the Metastore schema. Run the following command to show the available options for the Hive Schema Tool:

sudo hive --service schematool -help

Be sure to complete the prerequisites mentioned earlier, including taking a backup or snapshot, before proceeding with the next steps.

  1. Note down the details of the existing Hive external Metastore to be upgraded.
    This includes the RDS for MySQL endpoint host name, database name (for this post, hive), user name, and password. You can do this through one of the following options:

    • Get the Hive Metastore DB information from the Hive configuration file – Log in to the EMR 5.X primary node, open the file /etc/hive/conf/hive-site.xml, and note the four properties:
      <property>
      	  <name>javax.jdo.option.ConnectionURL</name>
      	  <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionDriverName</name>
      	  <value>org.mariadb.jdbc.Driver</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionUserName</name>
      	  <value>{username}</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionPassword</name>
      	  <value>{password}</value>
      	  <description>password to use against metastore database</description>
      	</property>

    • Get the Hive Metastore DB information from the Amazon EMR console – Navigate to the EMR 5.X cluster, choose the Configurations tab, and note down the Metastore DB information.
      EMR Cosole for Configuration
  1. Create a new EMR 6.X cluster.
    To use the Hive Schema Tool, we need to create an EMR 6.X cluster. You can create a new EMR 6.X cluster via the Hive console or the AWS Command Line Interface (AWS CLI), without specifying external hive Metastore details. This lets the EMR 6.X cluster launch successfully using the default Hive Metastore. For more information about EMR cluster management, refer to Plan and configure clusters.
  2. After your new EMR 6.X cluster is launched successfully and is in the waiting state, SSH to the EMR 6.X primary node and take a backup of /etc/hive/conf/hive-site.xml:
    sudo cp /etc/hive/conf/hive-site.xml /etc/hive/conf/hive-site.xml.bak

  3. Stop Hive services:
    sudo systemctl stop hive-hcatalog-server.service
    sudo systemctl stop hive-server2.service

    Now you update the Hive configuration and point it to the old hive Metastore database.

  4. Modify /etc/hive/conf/hive-site.xml and update the properties with the values you collected earlier:
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>org.mariadb.jdbc.Driver</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>{username}</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>{password}</value>
      <description>password to use against metastore database</description>
    </property>

  5. On the same or new SSH session, run the Hive Schema Tool to check that the Metastore is pointing to the old Metastore database:
    sudo hive --service schemaTool -dbType mysql -info

    The output should look as follows (old-hostname, old-dbname, and old-username are the values you changed):

    Metastore connection URL:     jdbc:mysql://{old-hostname}:3306/{old-dbname}?createDatabaseIfNotExist=true
    Metastore Connection Driver :     org.mariadb.jdbc.Driver
    Metastore connection User:     {old-username}
    Hive distribution version:     3.1.0
    Metastore schema version:      2.3.0
    schemaTool completed

    You can upgrade the Hive Metastore by passing the -upgradeSchema option to the Hive Schema Tool. The tool figures out the SQL scripts required to initialize or upgrade the schema and then runs those scripts against the backend database.

  6. Run the upgradeSchema command with -dryRun, which only lists the SQL scripts needed during the actual run:
    sudo hive --service schematool -dbType mysql -upgradeSchema -dryRun

    The output should look like the following code. It shows the Metastore upgrade path from the old version to the new version. You can find the upgrade order on the GitHub repo. In case of failure during the upgrade process, these scripts can be run manually in the same order.

    Metastore connection URL:     jdbc:mysql://{old-hostname}:3306/{old-dbname}?createDatabaseIfNotExist=true
    Metastore Connection Driver :     org.mariadb.jdbc.Driver
    Metastore connection User:     {old-username}
    Hive distribution version:     3.1.0
    Metastore schema version:      2.3.0
    schemaTool completed

  7. To upgrade the Hive Metastore schema, run the Hive Schema Tool with -upgradeSchema:
    sudo hive --service schematool -dbType mysql -upgradeSchema

    The output should look like the following code:

    Starting upgrade metastore schema from version 2.3.0 to 3.1.0
    Upgrade script upgrade-2.3.0-to-3.0.0.mysql.sql
    ...
    Completed upgrade-2.3.0-to-3.0.0.mysql.sql
    Upgrade script upgrade-3.0.0-to-3.1.0.mysql.sql
    ...
    Completed upgrade-3.0.0-to-3.1.0.mysql.sql
    schemaTool completed

    In case of any issues or failures, you can run the preceding command with verbose. This prints all the queries getting run in order and their output.

    sudo hive --service schemaTool -verbose -dbType mysql -upgradeSchema

    If you encounter any failures during this process and you want to upgrade your Hive Metastore by running the SQL yourself, refer to Upgrading Hive Metastore.

    If HDFS was used as storage for the Hive warehouse or any Hive DB location, you need to update the NameNode alias or URI with the new cluster’s HDFS alias.

  8. Use the following commands to update the HDFS NameNode alias (replace <new-loc> <old-loc> with the HDFS root location of the new and old clusters, respectively):
    hive —service metatool -updateLocation <new-loc> <old-loc>

    You can run the following command on any EMR cluster node to get the HDFS NameNode alias:

    hdfs getconf -confKey dfs.namenode.rpc-address

    At first you can run with the dryRun option, which displays all the changes but aren’t persisted. For example:

    [[email protected] ~]$ hive --service metatool -updateLocation hdfs://ip-172-31-50-80.ec2.internal:8020 hdfs://ip-172-31-87-188.ec2.internal:8020 -dryRun
    Initializing HiveMetaTool..
    Looking for LOCATION_URI field in DBS table to update..
    Dry Run of updateLocation on table DBS..
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testdb.db new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testdb.db
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testdb_2.db new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testdb_2.db
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse
    Found 3 records in DBS table to update
    Looking for LOCATION field in SDS table to update..
    Dry Run of updateLocation on table SDS..
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testtable1 new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testtable1
    Found 1 records in SDS table to update

    However, if the new location needs to be changed to a different HDFS or S3 path, then use the following approach.

    First connect to the remote Hive Metastore database and run the following query to pull all the tables for a specific database and list the locations. Replace HiveMetastore_DB with the database name used for the Hive Metastore in the external database (for this post, hive) and the Hive database name (default):

    mysql> SELECT d.NAME as DB_NAME, t.TBL_NAME, t.TBL_TYPE, s.LOCATION FROM <HiveMetastore_DB>.TBLS t 
    JOIN <HiveMetastore_DB>.DBS d ON t.DB_ID = d.DB_ID JOIN <HiveMetastore_DB>.SDS s 
    ON t.SD_ID = s.SD_ID AND d.NAME='default';

    Identify the table for which location needs to be updated. Then run the Alter table command to update the table locations. You can prepare a script or chain of Alter table commands to update the locations for multiple tables.

    ALTER TABLE <table_name> SET LOCATION "<new_location>";

  9. Start and check the status of Hive Metastore and HiveServer2:
    sudo systemctl start hive-hcatalog-server.service
    sudo systemctl start hive-server2.service
    sudo systemctl status hive-hcatalog-server.service
    sudo systemctl status hive-server2.service

Post-upgrade validation

Perform the following post-upgrade steps:

  1. Confirm the Hive Metastore schema is upgraded to the new version:
    sudo hive --service schemaTool -dbType mysql -validate

    The output should look like the following code:

    Starting metastore validation
    
    Validating schema version
    Succeeded in schema version validation.
    [SUCCESS]
    Validating sequence number for SEQUENCE_TABLE
    Succeeded in sequence number validation for SEQUENCE_TABLE.
    [SUCCESS]
    Validating metastore schema tables
    Succeeded in schema table validation.
    [SUCCESS]
    Validating DFS locations
    Succeeded in DFS location validation.
    [SUCCESS]
    Validating columns for incorrect NULL values.
    Succeeded in column validation for incorrect NULL values.
    [SUCCESS]
    Done with metastore validation: [SUCCESS]
    schemaTool completed

  2. Run the following Hive Schema Tool command to query the Hive schema version and verify that it’s upgraded:
    $ sudo hive --service schemaTool -dbType mysql -info
    Metastore connection URL:        jdbc:mysql://<host>:3306/<hivemetastore-dbname>?createDatabaseIfNotExist=true
    Metastore Connection Driver :    org.mariadb.jdbc.Driver
    Metastore connection User:       <username>
    Hive distribution version:       3.1.0
    Metastore schema version:        3.1.0

  3. Run some DML queries against old tables and ensure they are running successfully.
  4. Verify the table and database counts using the same commands mentioned in the prerequisites section, and compare the counts.

The Hive Metastore schema migration process is complete, and you can start working on your new EMR cluster. If for some reason you want to relaunch the EMR cluster, then you just need to provide the Hive Metastore remote database that we upgraded in the previous steps using the options on the Amazon EMR Configurations tab.

Migrate the Amazon EMR Hive Metastore to the AWS Glue Data Catalog

The AWS Glue Data Catalog is flexible and reliable, and can reduce your operation cost. Moreover, the Data Catalog supports different versions of EMR clusters. Therefore, when you migrate your Amazon EMR 5.X Hive Metastore to the Data Catalog, you can use the same Data Catalog with any new EMR 5.8+ cluster, including Amazon EMR 6.x. There are some factors you should consider when using this approach; refer to Considerations when using AWS Glue Data Catalog for more information. The following diagram shows the upgrade procedure.
EMR Hive Metastore Migrate to Glue Data Catalog
To migrate your Hive Metastore to the Data Catalog, you can use the Hive Metastore migration script from GitHub. The following are the major steps for a direct migration.

Make sure all the table data is stored in Amazon S3 and not HDFS. Otherwise, tables migrated to the Data Catalog will have the table location pointing to HDFS, and you can’t query the table. You can check your table data location by connecting to the MySQL database and running the following SQL:

select SD_ID, LOCATION from SDS where LOCATION like 'hdfs%';

Make sure to complete the prerequisite steps mentioned earlier before proceeding with the migration. Ensure the EMR 5.X cluster is in a waiting state and all the components’ status are in service.

  1. Note down the details of the existing EMR 5.X cluster Hive Metastore database to be upgraded.
    As mentioned before, this includes the endpoint host name, database name, user name, and password. You can do this through one of the following options:

    • Get the Hive Metastore DB information from the Hive configuration file – Log in to the Amazon EMR 5.X primary node, open the file /etc/hive/conf/hive-site.xml, and note the four properties:
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>{username}</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>{password}</value>
      <description>password to use against metastore database</description>
    </property>

    • Get the Hive Metastore DB information from the Amazon EMR console – Navigate to the Amazon EMR 5.X cluster, choose the Configurations tab, and note down the Metastore DB information.

    EMR Cosole for Configuration

  2. On the AWS Glue console, create a connection to the Hive Metastore as a JDBC data source.
    Use the connection JDBC URL, user name, and password you gathered in the previous step. Specify the VPC, subnet, and security group associated with your Hive Metastore. You can find these on the Amazon EMR console if the Hive Metastore is on the EMR primary node, or on the Amazon RDS console if the Metastore is an RDS instance.
  3. Download two extract, transform, and load (ETL) job scripts from GitHub and upload them to an S3 bucket:
    wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/Hive_metastore_migration/src/hive_metastore_migration.py
    wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/Hive_metastore_migration/src/import_into_datacatalog.py

    If you configured AWS Glue to access Amazon S3 from a VPC endpoint, you must upload the script to a bucket in the same AWS Region where your job runs.

    Now you must create a job on the AWS Glue console to extract metadata from your Hive Metastore to migrate it to the Data Catalog.

  4. On the AWS Glue console, choose Jobs in the navigation pane.
  5. Choose Create job.
  6. Select Spark script editor.
  7. For Options¸ select Upload and edit an existing script.
  8. Choose Choose file and upload the import_into_datacatalog.py script you downloaded earlier.
  9. Choose Create.
    Glue Job script editor
  10. On the Job details tab, enter a job name (for example, Import-Hive-Metastore-To-Glue).
  11. For IAM Role, choose a role.
  12. For Type, choose Spark.
    Glue ETL Job details
  13. For Glue version¸ choose Glue 3.0.
  14. For Language, choose Python 3.
  15. For Worker type, choose G1.X.
  16. For Requested number of workers, enter 2.
    Glue ETL Job details
  17. In the Advanced properties section, for Script filename, enter import_into_datacatalog.py.
  18. For Script path, enter the S3 path you used earlier (just the parent folder).
    Glue ETL Job details
  19. Under Connections, choose the connection you created earlier.
    Glue ETL Job details
  20. For Python library path, enter the S3 path you used earlier for the file hive_metastore_migration.py.
  21. Under Job parameters, enter the following key-pair values:
    • --mode: from-jdbc
    • --connection-name: EMR-Hive-Metastore
    • --region: us-west-2

    Glue ETL Job details

  22. Choose Save to save the job.
  23. Run the job on demand on the AWS Glue console.

If the job runs successfully, Run status should show as Succeeded. When the job is finished, the metadata from the Hive Metastore is visible on the AWS Glue console. Check the databases and tables listed to verify that they were migrated correctly.

Known issues

In some cases where the Hive Metastore schema version is on a very old release or if some required metadata tables are missing, the upgrade process may fail. In this case, you can use the following steps to identify and fix the issue. Run the schemaTool upgradeSchema command with verbose as follows:

sudo hive --service schemaTool -verbose -dbType mysql -upgradeSchema

This prints all the queries being run in order and their output:

jdbc:mysql://metastore.xxxx.us-west-1> CREATE INDEX PCS_STATS_IDX ON PAR T_COL_STATS (DB_NAME,TABLE_NAME,COLUMN_NAME,PARTITION_NAME) USING BTREE
Error: (conn=6831922) Duplicate key name 'PCS_STATS_IDX' (state=42000,code=1061)
Closing: 0: jdbc:mysql://metastore.xxxx.us-west-1.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true
org.apache.hadoop.hive.metastore.HiveMetaException: Schema initialization FAILED! Metastore state would be inconsistent !!
Underlying cause: java.io.IOException : Schema script failed, errorcode 2

Note down the query and the error message, then take the required steps to address the issue. For example, depending on the error message, you may have to create the missing table or alter an existing table. Then you can either rerun the schemaTool upgradeSchema command, or you can manually run the remaining queries required for upgrade. You can get the complete script that schemaTool runs from the following path on the primary node /usr/lib/hive/scripts/metastore/upgrade/mysql/ or from GitHub.

Clean up

Running additional EMR clusters to perform the upgrade activity in your AWS account may incur additional charges. When you complete the Hive Metastore upgrade successfully, we recommend deleting the additional EMR clusters to save cost.

Conclusion

To upgrade Amazon EMR from 5.X to 6.X and take advantage of some features from Hive 3.X or Spark SQL 3.X, you have to upgrade the Hive Metastore first. If you’re using the AWS Glue Data Catalog as your Hive Metastore, you don’t need to do anything because the Data Catalog supports both Amazon EMR versions. If you’re using a MySQL database as the external Hive Metastore, you can upgrade by following the steps outlined in this post, or you can migrate your Hive Metastore to the Data Catalog.

There are some functional differences between the different versions of Hive, Spark, and Flink. If you have some applications running on Amazon EMR 5.X, make sure test your applications in Amazon EMR 6.X and validate the function compatibility. We will cover application upgrades for Amazon EMR components in a future post.


About the authors

Jianwei Li is Senior Analytics Specialist TAM. He provides consultant service for AWS enterprise support customers to design and build modern data platform. He has more than 10 years experience in big data and analytics domain. In his spare time, he like running and hiking.

Narayanan Venkateswaran is an Engineer in the AWS EMR group. He works on developing Hive in EMR. He has over 17 years of work experience in the industry across several companies including Sun Microsystems, Microsoft, Amazon and Oracle. Narayanan also holds a PhD in databases with focus on horizontal scalability in relational stores.

Partha Sarathi is an Analytics Specialist TAM – at AWS based in Sydney, Australia. He brings 15+ years of technology expertise and helps Enterprise customers optimize Analytics workloads. He has extensively worked on both on-premise and cloud Bigdata workloads along with various ETL platform in his previous roles. He also actively works on conducting proactive operational reviews around the Analytics services like Amazon EMR, Redshift, and OpenSearch.

Krish is an Enterprise Support Manager responsible for leading a team of specialists in EMEA focused on BigData & Analytics, Databases, Networking and Security. He is also an expert in helping enterprise customers modernize their data platforms and inspire them to implement operational best practices. In his spare time, he enjoys spending time with his family, travelling, and video games.

Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/design-considerations-for-amazon-emr-on-eks-in-a-multi-tenant-amazon-eks-environment/

Many AWS customers use Amazon Elastic Kubernetes Service (Amazon EKS) in order to take advantage of Kubernetes without the burden of managing the Kubernetes control plane. With Kubernetes, you can centrally manage your workloads and offer administrators a multi-tenant environment where they can create, update, scale, and secure workloads using a single API. Kubernetes also allows you to improve resource utilization, reduce cost, and simplify infrastructure management to support different application deployments. This model is beneficial for those running Apache Spark workloads, for several reasons. For example, it allows you to have multiple Spark environments running concurrently with different configurations and dependencies that are segregated from each other through Kubernetes multi-tenancy features. In addition, the same cluster can be used for various workloads like machine learning (ML), host applications, data streaming and thereby reducing operational overhead of managing multiple clusters.

AWS offers Amazon EMR on EKS, a managed service that enables you to run your Apache Spark workloads on Amazon EKS. This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. When you run Spark jobs on EMR on EKS and not on self-managed Apache Spark on Kubernetes, you can take advantage of automated provisioning, scaling, faster runtimes, and the development and debugging tools that Amazon EMR provides

In this post, we show how to configure and run EMR on EKS in a multi-tenant EKS cluster that can used by your various teams. We tackle multi-tenancy through four topics: network, resource management, cost management, and security.

Concepts

Throughout this post, we use terminology that is either specific to EMR on EKS, Spark, or Kubernetes:

  • Multi-tenancy – Multi-tenancy in Kubernetes can come in three forms: hard multi-tenancy, soft multi-tenancy and sole multi-tenancy. Hard multi-tenancy means each business unit or group of applications gets a dedicated Kubernetes; there is no sharing of the control plane. This model is out of scope for this post. Soft multi-tenancy is where pods might share the same underlying compute resource (node) and are logically separated using Kubernetes constructs through namespaces, resource quotas, or network policies. A second way to achieve multi-tenancy in Kubernetes is to assign pods to specific nodes that are pre-provisioned and allocated to a specific team. In this case, we talk about sole multi-tenancy. Unless your security posture requires you to use hard or sole multi-tenancy, you would want to consider using soft multi-tenancy for the following reasons:
    • Soft multi-tenancy avoids underutilization of resources and waste of compute resources.
    • There is a limited number of managed node groups that can be used by Amazon EKS, so for large deployments, this limit can quickly become a limiting factor.
    • In sole multi-tenancy there is high chance of ghost nodes with no pods scheduled on them due to misconfiguration as we force pods into dedicated nodes with label, taints and tolerance and anti-affinity rules.
  • Namespace – Namespaces are core in Kubernetes and a pillar to implement soft multi-tenancy. With namespaces, you can divide the cluster into logical partitions. These partitions are then referenced in quotas, network policies, service accounts, and other constructs that help isolate environments in Kubernetes.
  • Virtual cluster – An EMR virtual cluster is mapped to a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace on an EKS cluster. Virtual clusters don’t create any active resources that contribute to your bill or require lifecycle management outside the service.
  • Pod template – In EMR on EKS, you can provide a pod template to control pod placement, or define a sidecar container. This pod template can be defined for executor pods and driver pods, and stored in an Amazon Simple Storage Service (Amazon S3) bucket. The S3 locations are then submitted as part of the applicationConfiguration object that is part of configurationOverrides, as defined in the EMR on EKS job submission API.

Security considerations

In this section, we address security from different angles. We first discuss how to protect IAM role that is used for running the job. Then address how to protect secrets use in jobs and finally we discuss how you can protect data while it is processed by Spark.

IAM role protection

A job submitted to EMR on EKS needs an AWS Identity and Access Management (IAM) execution role to interact with AWS resources, for example with Amazon S3 to get data, with Amazon CloudWatch Logs to publish logs, or use an encryption key in AWS Key Management Service (AWS KMS). It’s a best practice in AWS to apply least privilege for IAM roles. In Amazon EKS, this is achieved through IRSA (IAM Role for Service Accounts). This mechanism allows a pod to assume an IAM role at the pod level and not at the node level, while using short-term credentials that are provided through the EKS OIDC.

IRSA creates a trust relationship between the EKS OIDC provider and the IAM role. This method allows only pods with a service account (annotated with an IAM role ARN) to assume a role that has a trust policy with the EKS OIDC provider. However, this isn’t enough, because it would allow any pod with a service account within the EKS cluster that is annotated with a role ARN to assume the execution role. This must be further scoped down using conditions on the role trust policy. This condition allows the assume role to happen only if the calling service account is the one used for running a job associated with the virtual cluster. The following code shows the structure of the condition to add to the trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": <OIDC provider ARN >
            },
            "Action": "sts:AssumeRoleWithWebIdentity"
            "Condition": { "StringLike": { “<OIDC_PROVIDER>:sub": "system:serviceaccount:<NAMESPACE>:emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>”} }
        }
    ]
}

To scope down the trust policy using the service account condition, you need to run the following the command with AWS CLI:

aws emr-containers update-role-trust-policy \
–cluster-name cluster \
–namespace namespace \
–role-name iam_role_name_for_job_execution

The command will the add the service account that will be used by the spark client, Jupyter Enterprise Gateway, Spark kernel, driver or executor. The service accounts name have the following structure emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>.

In addition to the role segregation offered by IRSA, we recommend blocking access to instance metadata because a pod can still inherit the rights of the instance profile assigned to the worker node. For more information about how you can block access to metadata, refer to Restrict access to the instance profile assigned to the worker node.

Secret protection

Sometime a Spark job needs to consume data stored in a database or from APIs. Most of the time, these are protected with a password or access key. The most common way to pass these secrets is through environment variables. However, in a multi-tenant environment, this means any user with access to the Kubernetes API can potentially access the secrets in the environment variables if this access isn’t scoped well to the namespaces the user has access to.

To overcome this challenge, we recommend using a Secrets store like AWS Secrets Manager that can be mounted through the Secret Store CSI Driver. The benefit of using Secrets Manager is the ability to use IRSA and allow only the role assumed by the pod access to the given secret, thereby improving your security posture. You can refer to the best practices guide for sample code showing the use of Secrets Manager with EMR on EKS.

Spark data encryption

When a Spark application is running, the driver and executors produce intermediate data. This data is written to the node local storage. Anyone who is able to exec into the pods would be able to read this data. Spark supports encryption of this data, and it can be enabled by passing --conf spark.io.encryption.enabled=true. Because this configuration adds performance penalty, we recommend enabling data encryption only for workloads that store and access highly sensitive data and in untrusted environments.

Network considerations

In this section we discuss how to manage networking within the cluster as well as outside the cluster. We first address how Spark handle cross executors and driver communication and how to secure it. Then we discuss how to restrict network traffic between pods in the EKS cluster and allow only traffic destined to EMR on EKS. Last, we discuss how to restrict traffic of executors and driver pods to external AWS service traffic using security groups.

Network encryption

The communication between the driver and executor uses RPC protocol and is not encrypted. Starting with Spark 3 in the Kubernetes backed cluster, Spark offers a mechanism to encrypt communication using AES encryption.

The driver generates a key and shares it with executors through the environment variable. Because the key is shared through the environment variable, potentially any user with access to the Kubernetes API (kubectl) can read the key. We recommend securing access so that only authorized users can have access to the EMR virtual cluster. In addition, you should set up Kubernetes role-based access control in such a way that the pod spec in the namespace where the EMR virtual cluster runs is granted to only a few selected service accounts. This method of passing secrets through the environment variable would change in the future with a proposal to use Kubernetes secrets.

To enable encryption, RPC authentication must also be enabled in your Spark configuration. To enable encryption in-transit in Spark, you should use the following parameters in your Spark config:

--conf spark.authenticate=true

--conf spark.network.crypto.enabled=true

Note that these are the minimal parameters to set; refer to Encryption from the complete list of parameters.

Additionally, applying encryption in Spark has a negative impact on processing speed. You should only apply it when there is a compliance or regulation need.

Securing Network traffic within the cluster

In Kubernetes, by default pods can communicate over the network across different namespaces in the same cluster. This behavior is not always desirable in a multi-tenant environment. In some instances, for example in regulated industries, to be compliant you want to enforce strict control over the network and send and receive traffic only from the namespace that you’re interacting with. For EMR on EKS, it would be the namespace associated to the EMR virtual cluster. Kubernetes offers constructs that allow you to implement network policies and define fine-grained control over the pod-to-pod communication. These policies are implemented by the CNI plugin; in Amazon EKS, the default plugin would be the VPC CNI. A policy is defined as follows and is applied with kubectl:

Kind: NetworkPolicy
metadata:
  name: default-np-ns1
  namespace: <EMR-VC-NAMESPACE>
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          nsname: <EMR-VC-NAMESPACE>

Network traffic outside the cluster

In Amazon EKS, when you deploy pods on Amazon Elastic Compute Cloud (Amazon EC2) instances, all the pods use the security group associated with the node. This can be an issue if your pods (executor pods) are accessing a data source (namely a database) that allows traffic based on the source security group. Database servers often restrict network access only from where they are expecting it. In the case of a multi-tenant EKS cluster, this means pods from other teams that shouldn’t have access to the database servers, would be able to send traffic to it.

To overcome this challenge, you can use security groups for pods. This feature allows you to assign a specific security group to your pods, thereby controlling the network traffic to your database server or data source. You can also refer to the best practices guide for a reference implementation.

Cost management and chargeback

In a multi-tenant environment, cost management is a critical subject. You have multiple users from various business units, and you need to be able to precisely chargeback the cost of the compute resource they have used. At the beginning of the post, we introduced three models of multi-tenancy in Amazon EKS: hard multi-tenancy, soft multi-tenancy, and sole multi-tenancy. Hard multi-tenancy is out of scope because the cost tracking is trivial; all the resources are dedicated to the team using the cluster, which is not the case for sole multi-tenancy and soft multi-tenancy. In the next sections, we discuss these two methods to track the cost for each of model.

Soft multi-tenancy

In a soft multi-tenant environment, you can perform chargeback to your data engineering teams based on the resources they consumed and not the nodes allocated. In this method, you use the namespaces associated with the EMR virtual cluster to track how much resources were used for processing jobs. The following diagram illustrates an example.

Diagram to illustrate soft multi-tenancy

Diagram -1 Soft multi-tenancy

Tracking resources based on the namespace isn’t an easy task because jobs are transient in nature and fluctuate in their duration. However, there are partner tools available that allow you to keep track of the resources used, such as Kubecost, CloudZero, Vantage, and many others. For instructions on using Kubecost on Amazon EKS, refer to this blog post on cost monitoring for EKS customers.

Sole multi-tenancy

For sole multi-tenancy, the chargeback is done at the instance (node) level. Each member on your team uses a specific set of nodes that are dedicated to it. These nodes aren’t always running, and are spun up using the Kubernetes auto scaling mechanism. The following diagram illustrates an example.

Diagram to illustrate Sole tenancy

Diagram -2 Sole tenancy

With sole multi-tenancy, you use a cost allocation tag, which is an AWS mechanism that allows you to track how much each resource has consumed. Although the method of sole multi-tenancy isn’t efficient in terms of resource utilization, it provides a simplified strategy for chargebacks. With the cost allocation tag, you can chargeback a team based on all the resources they used, like Amazon S3, Amazon DynamoDB, and other AWS resources. The chargeback mechanism based on the cost allocation tag can be augmented using the recently launched AWS Billing Conductor, which allows you to issue bills internally for your team.

Resource management

In this section, we discuss considerations regarding resource management in multi-tenant clusters. We briefly discuss topics like sharing resources graciously, setting guard rails on resource consumption, techniques for ensuring resources for time sensitive and/or critical jobs, meeting quick resource scaling requirements and finally cost optimization practices with node selectors.

Sharing resources

In a multi-tenant environment, the goal is to share resources like compute and memory for better resource utilization. However, this requires careful capacity management and resource allocation to make sure each tenant gets their fair share. In Kubernetes, resource allocation is controlled and enforced by using ResourceQuota and LimitRange. ResourceQuota limits resources on the namespace level, and LimitRange allows you to make sure that all the containers are submitted with a resource requirement and a limit. In this section, we demonstrate how a data engineer or Kubernetes administrator can set up ResourceQuota as a LimitRange configuration.

The administrator creates one ResourceQuota per namespace that provides constraints for aggregate resource consumption:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: compute-resources
  namespace: teamA
spec:
  hard:
    requests.cpu: "1000"
    requests.memory: 4000Gi
    limits.cpu: "2000"
    limits.memory: 6000Gi

For LimitRange, the administrator can review the following sample configuration. We recommend using default and defaultRequest to enforce the limit and request field on containers. Lastly, from a data engineer perspective while submitting the EMR on EKS jobs, you need to make sure the Spark parameters of resource requirements are within the range of the defined LimitRange. For example, in the following configuration, the request for spark.executor.cores=7 will fail because the max limit for CPU is 6 per container:

apiVersion: v1
kind: LimitRange
metadata:
  name: cpu-min-max
  namespace: teamA
spec:
  limits:
  - max:
      cpu: "6"
    min:
      cpu: "100m"
    default:
      cpu: "500m"
    defaultRequest:
      cpu: "100m"
    type: Container

Priority-based resource allocation

Diagram Illustrates an example of resource allocation with priority

Diagram – 3 Illustrates an example of resource allocation with priority.

As all the EMR virtual clusters share the same EKS computing platform with limited resources, there will be scenarios in which you need to prioritize jobs in a sensitive timeline. In this case, high-priority jobs can utilize the resources and finish the job, whereas low-priority jobs that are running gets stopped and any new pods must wait in the queue. EMR on EKS can achieve this with the help of pod templates, where you specify a priority class for the given job.

When a pod priority is enabled, the Kubernetes scheduler orders pending pods by their priority and places them in the scheduling queue. As a result, the higher-priority pod may be scheduled sooner than pods with lower priority if its scheduling requirements are met. If this pod can’t be scheduled, the scheduler continues and tries to schedule other lower-priority pods.

The preemptionPolicy field on the PriorityClass defaults to PreemptLowerPriority, and the pods of that PriorityClass can preempt lower-priority pods. If preemptionPolicy is set to Never, pods of that PriorityClass are non-preempting. In other words, they can’t preempt any other pods. When lower-priority pods are preempted, the victim pods get a grace period to finish their work and exit. If the pod doesn’t exit within that grace period, that pod is stopped by the Kubernetes scheduler. Therefore, there is usually a time gap between the point when the scheduler preempts victim pods and the time that a higher-priority pod is scheduled. If you want to minimize this gap, you can set a deletion grace period of lower-priority pods to zero or a small number. You can do this by setting the terminationGracePeriodSeconds option in the victim Pod YAML.

See the following code samples for priority class:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 100
globalDefault: false
description: " High-priority Pods and for Driver Pods."

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: low-priority
value: 50
globalDefault: false
description: " Low-priority Pods."

One of the key considerations while templatizing the driver pods, especially for low-priority jobs, is to avoid the same low-priority class for both driver and executor. This will save the driver pods from getting evicted and lose the progress of all its executors in a resource congestion scenario. In this low-priority job example, we have used a high-priority class for driver pod templates and low-priority classes only for executor templates. This way, we can ensure the driver pods are safe during the eviction process of low-priority jobs. In this case, only executors will be evicted, and the driver can bring back the evicted executor pods as the resource becomes freed. See the following code:

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "high-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: ON_DEMAND
  containers:
  - name: spark-kubernetes-driver # This will be interpreted as Spark driver container

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "low-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: SPOT
  containers:
  - name: spark-kubernetes-executors # This will be interpreted as Spark executor container

Overprovisioning with priority

Diagram Illustrates an example of overprovisioning with priority

Diagram – 4 Illustrates an example of overprovisioning with priority.

As pods wait in a pending state due to resource availability, additional capacity can be added to the cluster with Amazon EKS auto scaling. The time it takes to scale the cluster by adding new nodes for deployment has to be considered for time-sensitive jobs. Overprovisioning is an option to mitigate the auto scaling delay using temporary pods with negative priority. These pods occupy space in the cluster. When pods with high priority are unschedulable, the temporary pods are preempted to make the room. This causes the auto scaler to scale out new nodes due to overprovisioning. Be aware that this is a trade-off because it adds higher cost while minimizing scheduling latency. For more information about overprovisioning best practices, refer to Overprovisioning.

Node selectors

EKS clusters can span multiple Availability Zones in a VPC. A Spark application whose driver and executor pods are distributed across multiple Availability Zones can incur inter- Availability Zone data transfer costs. To minimize or eliminate the data transfer cost, you should configure the job to run on a specific Availability Zone or even specific node type with the help of node labels. Amazon EKS places a set of default labels to identify capacity type (On-Demand or Spot Instance), Availability Zone, instance type, and more. In addition, we can use custom labels to meet workload-specific node affinity.

EMR on EKS allows you to choose specific nodes in two ways:

  • At the job level. Refer to EKS Node Placement for more details.
  • In the driver and executor level using pod templates.

When using pod templates, we recommend using on demand instances for driver pods. You can also consider including spot instances for executor pods for workloads that are tolerant of occasional periods when the target capacity is not completely available. Leveraging spot instances allow you to save cost for jobs that are not critical and can be terminated. Please refer Define a NodeSelector in PodTemplates.

Conclusion

In this post, we provided guidance on how to design and deploy EMR on EKS in a multi-tenant EKS environment through different lenses: network, security, cost management, and resource management. For any deployment, we recommend the following:

  • Use IRSA with a condition scoped on the EMR on EKS service account
  • Use a secret manager to store credentials and the Secret Store CSI Driver to access them in your Spark application
  • Use ResourceQuota and LimitRange to specify the resources that each of your data engineering teams can use and avoid compute resource abuse and starvation
  • Implement a network policy to segregate network traffic between pods

Lastly, if you are considering migrating your spark workload to EMR on EKS you can further learn about design patterns to manage Apache Spark workload in EMR on EKS in this blog and about migrating your EMR transient cluster to EMR on EKS in this blog.


About the Authors

author - lotfiLotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

author - peter ajeebAjeeb Peter is a Senior Solutions Architect with Amazon Web Services based in Charlotte, North Carolina, where he guides global financial services customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings over 20 years of technology experience on Software Development, Architecture and Analytics from industries like finance and telecom.

How ZS created a multi-tenant self-service data orchestration platform using Amazon MWAA

Post Syndicated from Manish Mehra original https://aws.amazon.com/blogs/big-data/how-zs-created-a-multi-tenant-self-service-data-orchestration-platform-using-amazon-mwaa/

This is post is co-authored by Manish Mehra, Anirudh Vohra, Sidrah Sayyad, and Abhishek I S (from ZS), and Parnab Basak (from AWS). The team at ZS collaborated closely with AWS to build a modern, cloud-native data orchestration platform.

ZS is a management consulting and technology firm focused on transforming global healthcare and beyond. We leverage our leading-edge analytics, plus the power of data, science, and products, to help our clients make more intelligent decisions, deliver innovative solutions, and improve outcomes for all. Founded in 1983, ZS has more than 12,000 employees in 35 offices worldwide.

ZAIDYNTM by ZS is an intelligent, cloud-native platform that helps life sciences organizations shape the future. Its analytics, algorithms, and workflows empower people, transform processes, and unlock real value. Designed to learn and grow with our clients, the platform is modular, future-ready, and fueled by global connectivity. And as more people engage, share, and build, our platform gets smarter—helping organizations fuel discovery, connect with customers, deliver treatments, and improve lives. ZAIDYN is helping companies of all sizes gain fluency in the full spectrum of life sciences so they can move faster, together through its Data & Analytics, Customer Engagement, Field Performance and Clinical Development offerings.

ZAIDYN Data & Analytics apps provide business users with self-service tools to innovate and scale insights delivery across the enterprise. ZAIDYN Data Hub (a part of the Data & Analytics product category) provides self-service options for guided workflows, data connectors, quality checks, and more. The elastic data processing offered by AWS helps prioritize processing speeds.

Data Hub customers wanted a one-stop solution for managing their data pipelines. A solution that does not require end users to gain additional knowledge about the nitty-gritties of the tool, one which is easy for users to get onboarded on, thereby increasing the demand for data orchestration capabilities within the application. A few of the sophisticated asks like start and stop of workflows, maintaining history of past runs, and providing real-time status updates for individual tasks of the workflow became increasingly important for end clients. We needed a mature orchestration tool, which led us to Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Amazon MWAA is a managed orchestration service for Apache Airflow that makes it easier to set up and operate end-to-end data pipelines in the cloud at scale.

In this post, we share how ZS created a multi-tenant self-service data orchestration platform using Amazon MWAA.

Why we chose Amazon MWAA

Choosing the right orchestration tool was critical for us because we had to ensure that the service was operationally efficient and cost-effective, provided high availability, had extensive features to support our business cases, and yet was easy to adapt for our end-users (data engineers). We evaluated and experimented among Amazon MWAA, Azkaban on Amazon EMR, and AWS Step Functions before project initiation.

The following benefits of Amazon MWAA convinced us to adopt it:

  • AWS managed service – With Amazon MWAA, we don’t have to manage the underlying infrastructure for scalability and availability to maintain quality of service. The built-in autoscaling mechanism of Amazon MWAA automatically increases the number of Apache Airflow workers in response to running and queued tasks, and disposes of extra workers when there are no more tasks queued or running. The default environment is already built for high availability with multiple Airflow schedulers and workers, and the metadata database distributed across multiple Availability Zones. We also evaluated hosting open-source Airflow on our ZS infrastructure. However, due to infrastructure maintenance overhead and the high investment needed to make and maintain it at production grade, we decided to drop that option.
  • Security – With Amazon MWAA, our data is secure by default because workloads run in our own isolated and secure cloud environment using Amazon Virtual Private Cloud (Amazon VPC), and data is automatically encrypted using AWS Key Management Service (AWS KMS). We can control role-based authentication and authorization for Apache Airflow’s user interface via AWS Identity and Access Management (IAM), providing users single sign-on (SSO) access for scheduling and viewing workflow runs.
  • Compatibility and active community support – Amazon MWAA hosts the same open-source Apache Airflow version without any forks. The open-source community for Apache Airflow is very active with multiple commits, files changes, issue resolutions, and community advice.
  • Language and connector support – The flow definitions for Apache Airflow are based on Python, which is easy for our engineers to adapt. An extensive list of features and connectors is available out of the box in Amazon MWAA, including connectors for Hive, Amazon EMR, Livy, and Kubernetes. We needed to run all our Data Hub jobs (ingestion, applying custom rules and quality checks, or exporting data to third-party systems) on Amazon EMR. The necessary Amazon EMR operators are already available as a part of the Amazon-provided package for Airflow (apache-airflow-providers-amazon), which we could supplement rather than construct one from the ground up.
  • Cost – Cost was the most important aspect for us when adopting Amazon MWAA. Amazon MWAA is useful for those who are running thousands of tasks in the prod environment, which is why we decided to the make the Amazon MWAA environment multi-tenant such that the cost can be shared among clients. With our large Amazon MWAA environment, we only pay for what we use, with no minimum fees or upfront commitments. We estimated paying less than $1,000 per month, combined for our environment usage and additional worker instance pricing, yet achieve the scale of being able to run 200 concurrent tasks running 3 hours per day over 10 concurrent workers. This meant reduced operational costs and engineering overhead while meeting the on-demand monitoring needs of end-to-end data pipeline orchestration.

Solution overview

The following diagram illustrates the solution architecture.

We have a common control tier account where we host our software as a service application (Data Hub) on Amazon Elastic Compute Cloud (Amazon EC2) instances. Each client has their own version of this application deployed on this shared infrastructure. Amazon MWAA is also hosted in the same common control tier account. The control tier account has connectivity with tenant-specific AWS accounts. This is to maintain strong physical isolation of client data by segregating the AWS accounts for each client. Each client-specific account hosts EMR clusters where data processing takes place. When a processing job is complete, data may reside on Amazon EMR (an HDFS cluster) or on Amazon Simple Storage Service (Amazon S3), an EMRFS cluster, depending on configuration. The DAG files generated by our Data Hub application contain metadata of the processes, and don’t contain any sensitive client information. When a job is submitted from Data Hub, the API request contains tenant-specific information needed to pull up the corresponding AWS connection details, which are stored as Airflow connection objects. These connection details are consumed by our custom implementation of Airflow EMR step operators (add and watch) to perform operations on the tenant EMR clusters.

Because the data orchestration capability is an application offering, the client teams create their processes on the Data Hub UI and don’t have access to the underlying Amazon MWAA environment.

The following screenshot shows how an end-user can configure Data Hub process on the application UI.

How Data Hub processes map to Amazon MWAA DAGs

Data Hub processes map to Amazon MWAA DAGs as follows:

  • Each process in Data Hub corresponds to a DAG in Amazon MWAA, and each component is a task (denoted by Sn​) that is submitted as a step on the client EMR clusters.
  • The application generates the DAG file dynamically and updates it on the S3 bucket linked to the Amazon MWAA environment.
  • Parsing dedicated structures representing a given process and submitting or tracking the Amazon EMR steps is abstracted from the end-user. Dynamic DAG generation is responsible for using the latest version of the underlying components and helps in managing the DAG schedule.
  • Some Airflow tasks are created as a part of the DAG, which take care of interacting with the application APIs to ensure that the required metadata is captured in a separate Amazon Relational Database Service (Amazon RDS) database instance.

A user can trigger a given process to run from the Data Hub UI or can schedule it to run at a specified time. Because a single Amazon MWAA environment is responsible for the data orchestration needs of multiple clients, our DAG decode logic ensures that the correct EMR cluster ID and Airflow connection ID are picked up at runtime. The configs responsible for storing these details are placed and updated on the S3 buckets via an automated deployment pipeline. A dedicated connection ID is created per client in Airflow, which is then utilized in our custom implementation of EmrAddStepsOperator. The connection ID captures the Region and role ARN to be assumed to interact with the EMR cluster in the client account. These cross-account roles have access to limited resources in each client account, following the principle of least privilege.

Generating a DAG from a process defined on Data Hub UI

Our front-end application is built using Angular (version 11) and uses a third-party library that facilitates drag-and-drop of components from the left pane on a canvas. Components are stitched together with connections defining dependencies to form a process. This process is translated by our custom engine to generate a dynamic Airflow DAG. A sample DAG generated from the preceding example process defined on the UI looks like the following figure.

We wrap the DAG by PEntry and PExit Python operators, and for each of the components on the Data Hub UI, we create two tasks: Cn and Wn.

The relevant terms for this solution are as follows:

  • PEntry​ – The Python operator used to insert an entry in the RDS database that the process run has started via API call.​
  • Cn– The ZS custom implementation of EMRAddStepsOperator used to submit a job (Data Hub component) on a running EMR cluster.​ This is followed by an API call to insert an entry in the database that the component job has started.​
  • Wn– The custom implementation of Airflow Watcher (EmrStepSensor), which checks the status of the step from our metadata database.​
  • PExit​ – The Python operator used to update an entry in the RDS database (more of a finally block) via API call.​

Lessons learned during the implementation

When implementing this solution, we learned the following:

  • We faced challenges in being able to consistently predict when a DAG will be parsed and made available in the Airflow UI in Amazon MWAA after the DAG file is synced to the linked S3 bucket. Depending on how complex the DAG is, it could happen within seconds or several minutes. Due to the lack of availability of an API or AWS Command Line Interface (AWS CLI) command to ascertain this, we put in some blanket restrictions (delay) on user operations from our UI to overcome this limitation.
  • Within Airflow, data pipelines are represented by DAGs, and these DAGs change over time as business needs evolve. A key challenge faced by Airflow users is looking at how a DAG was run in the past, and when it was replaced by a newer version of the DAG. This is because within Airflow (as of this writing), only the current (latest) version of the DAG is represented within the user interface, without any reference to prior versions of the DAG. To overcome this limitation, we implemented a backend way of generating a DAG from the available metadata, and use it to version control over runs.
  • Airflow CLI commands when invoked in DAGs always return an HTTP 200 response. You can’t solely rely on the HTTP response code to ascertain the status of commands. We applied additional parsing logic (particularly to analyze the errors on failure) to determine the true status of commands.
  • Airflow doesn’t have a command to gracefully stop a DAG that is currently running. You can stop a DAG (unmark as running) and clear the task’s state or even delete it in the UI. The actual running tasks in the executor won’t stop, but might be stopped if the executor realizes that it’s not in the database anymore.

Conclusion

Amazon MWAA sets up Apache Airflow for you using the same Apache Airflow user interface and open-source code. With Amazon MWAA, you can use Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. Amazon MWAA automatically scales its workflow run capacity to meet your needs, and is integrated with AWS security services to help provide you with fast and secure access to your data. In this post, we discussed how you can build a bridge tenancy isolation model with a central Amazon MWAA orchestrating task against independent infrastructure stacks in dedicated accounts deployed for each of your tenants. Through a custom UI, you can enable self-service workflow runs via Airflow dynamic DAGs using the power and flexibility of Python. This enables you to achieve economies of scale and operational efficiency while meeting your regulatory, security, and cost considerations.


About the Authors

Manish Mehra is a Software Architect, working with the SD group in ZS. He has more than 11 years of experience working in banking, gaming, and life science domains. He is currently looking into the architecture of the Data & Analytics product category of the ZAIDYN Platform. He has expertise in full-stack application development and building robust, scalable, enterprise-grade big data applications.

Anirudh Vohra is a Director of Cloud Architecture, working within the Cloud Center of Excellence space at ZS. He is passionate about being a developer advocate for internal engineering teams, also designing and building cloud platforms and abstractions to empower developers and troubleshoot complex systems.

Abhishek I S is Associate Cloud Architect at ZS Associates working within the Cloud Centre of Excellence space. He has diverse experience ranging from application development to cloud engineering. Currently, he is primarily focusing on architecture design and automation for the cloud-native solutions of various ZS products.

Sidrah Sayyad is an Associate Software Architect at ZS working within the Software Development (SD) group. She has 9 years of experience, which includes working on identity management, infrastructure management, and ETL applications. She is passionate about coding and helps architect and build applications to achieve business outcomes.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab was closely involved with the engagement with ZS, providing architectural guidance as well as helping the team overcome technical challenges during the implementation.

Optimize Ama­zon EMR costs for legacy and Spark workloads with managed scaling and node labels

Post Syndicated from Ramesh Raghupathy original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-costs-for-legacy-and-spark-workloads-with-managed-scaling-and-node-labels/

Customers migrating from large on-premises Hadoop clusters to Amazon EMR like to reduce their operational costs while running resilient applications. On-premises customers typically use in-elastic, large, fixed-size Hadoop clusters, which incurs high capital expenditure. You can now migrate your mixed workloads to managed scaling Amazon EMR, which saves costs without compromising performance.

This solution can benefit those running a mixed workload of legacy MapReduce applications concurrently with Spark applications. MapReduce applications such as Apache Sqoop jobs need to use Amazon Elastic Compute Cloud (Amazon EC2) On-Demand Instances for resilience, whereas Apache Spark job workers can use EC2 Spot Instances due to built-in resilience. Therefore, it’s critical that you can run your workloads with both On-Demand or Spot Instances when needed, while also having the elasticity and resiliency you need to achieve cost savings.

This post walks through a mixed workload scenario to illustrate the use of Amazon EMR managed scaling, node labels, and capacity scheduler configuration to create an elastic EMR cluster that provides elasticity and ability to deploy resilient applications.

Solution overview

For this post, we use the following Apache Sqoop and Apache Spark workloads to demonstrate the scenario and the results:

  • Sqoop workload – A simple Sqoop job to extract data from Amazon Redshift and write data to Amazon Simple Storage Service (Amazon S3)
  • Spark workload – A Python script that unions Amazon S3 data and writes it back to Amazon S3

The following diagram illustrates the two workloads used for this demonstration.

To build the solution, you must complete the following high-level steps:

  1. Determine the EMR cluster configuration for managed scaling with minimum and maximum capacity, and core and task nodes.
  2. For the workloads to run, identify the capacity scheduler queues required, queue capacity as % of total capacity, and Spot or On-Demand Instances used to meet the queue capacity.
  3. Assign YARN node labels to the On-Demand and Spot Instances in the capacity scheduler configuration to ensure the appropriate instance types are allocated to the queues.
  4. Create a bootstrap and step scripts to automate the configuration process during EMR cluster creation.
  5. Validate the cluster elasticity and application resilience by running the Sqoop and Spark applications.

The solution offers the following benefits:

  • Significantly reduces the time to migrate applications to Amazon EMR because you’re no longer struggling to implement cost-optimization techniques as well as application resilience while migrating from on-premises to the AWS Cloud
  • Offers cost savings when compared to running similar workloads on in-elastic, large on-premises Hadoop clusters
  • Enables you to run a mixed workload on EMR clusters without significantly redesigning your on-premises applications

Prerequisites

You need to complete the following steps before you can configure your EMR cluster and run the workloads.

Launch an Amazon Redshift cluster

We first launch an Amazon Redshift cluster. For instructions, refer to Create a sample Amazon Redshift cluster. We use Amazon Redshift as the relational database management service for the Sqoop job.

Create and associate an IAM role for loading the Amazon Redshift cluster

We create an AWS Identity and Access Management (IAM) role that allows the Amazon Redshift cluster to call AWS services on its behalf.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Use case, choose Redshift and Redshift-Customizable.
  4. Choose Next.
  5. For Permissions policies, choose the policy AmazonS3ReadOnlyAccess.
  6. Choose Next.
  7. For Role name, enter load_tpch_redshift.
  8. Choose Create role.
    Now you attach this role to the Amazon Redshift cluster.
  9. On the Properties tab, choose Manage IAM roles.
  10. Associate the IAM role.

Load test data into the Amazon Redshift cluster

We create a table called SQOOP_LOAD_TBL and load it with mock data to test the Sqoop job. The following code shows the create table and copy statement. The copy statement should load around 1000000 rows in the SQOOP_LOAD_TBL table, which we use to run a large Sqoop data movement job.

CREATE TABLE EMRBLOG.SQOOP_LOAD_TBL
(
ID BIGINT NOT NULL,
NAME VARCHAR(25),
REGIONKEY BIGINT,
COMMENT   VARCHAR(150),
TS1 TIMESTAMP 
)
DISTSTYLE EVEN;

copy EMRBLOG.SQOOP_LOAD_TBL
from 's3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/sqoop_input/redshift_manifest'
IAM_ROLE 'arn:aws:iam::xxxxxxxxxx:role/load_tpch_redshift' format parquet 
manifest
;

Create an Amazon RDS for PostgreSQL instance

We create an Amazon Relational Database Service (Amazon RDS) for PostgreSQL instance to use as the metastore for Sqoop.

The following configuration uses a small instance. We use Sqoop as master_user and postgres as the database name. Note the database name, user ID, and password—we use these to connect Sqoop running on the EMR cluster to this metastore.

Use the Amazon EMR automation scripts while creating the cluster

We use three automation scripts from the S3 folder s3://aws-blogs-artifacts-public/artifacts/BDB-1737/config/ while creating the EMR cluster.

The first script is a node label script used by YARN to determine if each instance is Spot or On-Demand:

getNodeLabels.py 

#!/usr/bin/python3
import json
k='/mnt/var/lib/info/extraInstanceData.json'
with open(k) as f:
    response = json.load(f)
    if (response['instanceRole'] in ['core','task']):
       print (f"NODE_PARTITION:{response['marketType'].upper()}")

This script runs during cluster creation to assign node labels SPOT or ON_DEMAND based on instance type.

The next is a bootstrap script to copy the node label script to the /home/hadoop directory on all cluster nodes:

getNodeLabels_bootstrap.sh

#!/bin/bash
set -vx
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-1737/config/getNodeLabels.py /home/hadoop
chmod +x /home/hadoop/getNodeLabels.py

This script is used during the bootstrap process to copy getNodeLabels.py from the S3 folder to /home/hadoop on the EMR cluster.

The last is a step script to update the Spot node to be exclusively used by the assigned capacity queue:

addNodeLabels.sh

#!/bin/bash
sudo -u yarn yarn rmadmin -addToClusterNodeLabels "SPOT(exclusive=true),ON_DEMAND(exclusive=false)"

There are two kinds of node partitions:

  • Exclusive – Containers are allocated to nodes with an exact match node partition. For example, asking partition=“x” will be allocated to the node with partition=“x”, and asking the DEFAULT partition will be allocated to the DEFAULT partition nodes.
  • Non-exclusive – If a partition is non-exclusive, it shares idle resources to the container requesting the DEFAULT partition.

We use exclusive labels for SPOT to ensure only Spark workloads can use them and non-exclusive labels for ON_DEMAND so that they can be used both by Spark and Sqoop workloads. For more details on the types of labels, refer to YARN Node Labels.

We’re now ready to run our solution.

Launch an EMR cluster

Complete the following steps to launch an EMR cluster:

  1. Determine the managed scaling EMR cluster configuration, choosing instance fleets, which allows us to choose up to 30 instance types and the minimum and maximum configuration to allocate core and task nodes while enabling scaling on the task nodes.
    We suggest the following EMR cluster configuration for instance fleets and EMR managed scaling with core and task nodes for this demonstration. The right number and types of nodes need to be chosen based on the workload needs of your use case.

    1. Minimum – 4
    2. Maximum – 64
    3. On-demand limit – 4
    4. Maximum core nodes – 4
  2. On the Amazon EMR console, choose Create cluster.
  3. In the Advanced Options section, for Software Configuration, select Hadoop, Sqoop, Oozie, and Spark.
  4. In the Edit software settings section, choose Enter configuration.
  5. Enter the following code, which includes yarn-site, capacity-scheduler, and Sqoop-site properties, and the addition of a property to spark-defaults. In the Sqoop-site section, update the metastore URL, user ID, and password.
    [
      {
        "classification": "yarn-site",
        "properties": {
          "yarn.resourcemanager.scheduler.class": "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler",
          "yarn.node-labels.enabled": "true",
          "yarn.node-labels.am.default-node-label-expression": "ON_DEMAND",
          "yarn.nodemanager.node-labels.provider": "script",
          "yarn.nodemanager.node-labels.provider.script.path": "/home/hadoop/getNodeLabels.py"
        },
        "configurations": []
      },
      {
        "classification": "capacity-scheduler",
        "properties": {
          "yarn.scheduler.capacity.root.queues": "default,Sqoop",
          "yarn.scheduler.capacity.root.Sqoop.capacity": "40",
          "yarn.scheduler.capacity.root.default.capacity": "60",
          "yarn.scheduler.capacity.root.default.accessible-node-labels": "*",
          "yarn.scheduler.capacity.root.Sqoop.accessible-node-labels": "ON_DEMAND",
          "yarn.scheduler.capacity.root.default.accessible-node-labels.ON_DEMAND.capacity": "45",
          "yarn.scheduler.capacity.root.Sqoop.accessible-labels.ON_DEMAND.capacity": "55",
          "yarn.scheduler.capacity.root.default.accessible-node-labels.SPOT.capacity": "100"
        },
        "configurations": []
      },
      {
        "classification": "Sqoop-site",
        "properties": {
          "Sqoop.metastore.client.enable.autoconnect": "true",
          "Sqoop.metastore.client.autoconnect.url": "jdbc:postgresql://<sqoop-metastore-alias>:5432/postgres",
          "Sqoop.metastore.client.autoconnect.username": "xxxxxx",
          "Sqoop.metastore.client.autoconnect.password": "xxxxxx",
          "Sqoop.metastore.client.record.password": "true"
        },
        "configurations": []
      },
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.yarn.executor.nodeLabelExpression": "SPOT"
        }
      }
    ]

    The configuration is created with two queues (Sqoop and default). The Sqoop queue has access only to the On-Demand nodes, and the default queue has access to both On-Demand and Spot nodes.

    In the spark-defaults section, the property "spark.yarn.executor.nodeLabelExpression": "SPOT" enables quick scaling of the Spot nodes and use of the Spot nodes by the Spark executors as soon as the Spark job starts. If this property isn’t used, Spot node scaling is triggered only after the On-Demand nodes are consumed. This causes a longer runtime for the job due to delayed scaling as well as the Spark executor’s inability to use the scaled-up Spot nodes.

  6. Add the addNodeLabels.sh script as a step, which is run using script-runner.jar.
  7. Under Cluster Composition, select Instance fleets, which provides options to choose the nodes from up to 30 instance types.
  8. Choose one primary, four core, and 0 task nodes.
  9. Under Cluster scaling, choose the Amazon EMR managed scaling policy option to define the core and task units (minimum 4, maximum 64, On-Demand limit 4, max core nodes 4).
  10. For Bootstrap Actions, add the getNodeLabels_bootstrap.sh script from Amazon S3 as a step.
    This script copies getNodeLabels.py from the Amazon S3 location to the /home/hadoop directory on the primary node.
  11. Use an existing EC2 key pair or create a new one if none exists, and download it to be used for logging onto the primary node.
  12. Choose existing security groups for the primary node and core and task nodes.
  13. Choose Create cluster and wait for cluster creation to complete.

Configure proxy settings to view websites hosted on the primary node

To configure your proxy settings, follow the instructions in Option 2, part 1: Set up an SSH tunnel to the master node using dynamic port forwarding and Option 2, part 2: Configure proxy settings to view websites hosted on the master node.

After the proxy settings are configured, run the following command on your terminal window (Mac):

ssh -i ec2-login-keypair.pem [email protected] -ND 8157

Open the Resource Manager UI (found on the Application User Interfaces tab on the Amazon EMR console, similar to http://ec2-xxxxxxxx.compute-1.amazonaws.com:8088/) and choose the scheduler option to monitor the jobs and use of capacity scheduler queues.

Run the Sqoop job

To run the Sqoop job and monitor YARN, complete the following steps:

  1. Connect to the primary node and run the Sqoop job -list command.
    This command creates the Sqoop metadata tables in the metastore.If you can’t connect Sqoop to Amazon RDS, make sure the Amazon RDS security group allows an inbound PostgreSQL TCP connection on port 5432 from both the EMR primary and secondary security groups. Follow the same procedure while connecting to Amazon Redshift to open the Amazon Redshift 5439 port for connections from both the EMR primary and secondary security groups.
  2. Create a test Sqoop job that reads the data from the Amazon Redshift table and writes to Amazon S3.
  3. Substitute the <last_value> to a timestamp value older than the value in TS1 column of the table EMRBLOG.SQOOP_LOAD_TBL and the <Target S3 folder>.
    Sqoop job -Dmapred.job.queue.name=Sqoop \
    --create Sqoop_redshift_extract \
    -- import \
    --connect 'jdbc:postgresql://redshift-clusterxx.xxxxxxxx.us-east-1.redshift.amazonaws.com:5439/dev?user=awsuser&password=xxxxxxxx’ \
    --fields-terminated-by '|' \
    --null-non-string '' \
    --null-string '' \
    --target-dir S3n://<Target S3 folder> \
    --query "Select * from EMRBLOG.SQOOP_LOAD_TBL where \$CONDITIONS" \
    --m 1 \
    --append \ 
    --check-column "ts1" \
    --incremental lastmodified \ 
    --last-value "<last_value>"

  4. List the job to verify if it was created correctly, and run the Sqoop job using the following command:
    # list the job to confirm creation
    Sqoop job —list 
    # run the job
    Sqoop job —exec Sqoop_redshift_extract

  5. Monitor the job and the use of the application queues using the scheduler option on the Resource Manager UI.

You should notice that the Sqoop job is using On-Demand nodes and the Sqoop queue under it. There is no usage of Spot nodes.

Create the Spark job and monitor YARN

The Spark job (emr_union_job.py) reads a mock Parquet dataset from Amazon S3. It uses an argument count to union multiple copies of the dataset, and sorts the result data before writing it to the Amazon S3 output location. Because this job consumes a large amount of memory for performing the union and sort operations, it triggers the Spark executors to scale up Spot nodes. When the job is complete, the EMR cluster should scale down the Spot nodes.

The count value can be varied between 1–8 to achieve varying cluster scaling and runtimes for the job based on the volume of data being unioned.

  1. Run the Spark job as a step on the EMR cluster using command-runner.jar, as shown in the following screenshot.
  2. Use the following sample command to submit the Spark job (emr_union_job.py).
    It takes in three arguments:

    • <input_full_path> – The Amazon S3 location of the data file that is read in by the Spark job. The path should not be changed. The input_full_path is s3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet.
    • <output_path> – The Amazon S3 folder where the results are written to.
    • <number of copies to be unioned> – By varying this argument, we can use the Spark job to trigger varying job runtimes and varying scaling of Spot nodes.
    spark-submit --deploy-mode cluster s3://aws-blogs-artifacts-public/artifacts/BDB-1737/scripts/emr_union_job.py s3://aws-blogs-artifacts-public/artifacts/BDB-1737/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet s3://mydatarr/union_out/ 6

  3. Review the Resource Manager UI and choose the Scheduler tab.
    You should see that only the resources under the default queue are being used both in ON_DEMAND and SPOT partitions.
  4. On the Hardware tab of the EMR cluster, verify if the cluster has scaled up the Spot Instances of the task nodes.Because Spark job executors are configured to use Spot nodes exclusively, the Spot nodes should scale up while the Spark job is running.
  5. After the job is complete, verify if the Spot nodes have scaled down to 0, indicating that the workload has run successfully.

Clean up

To help prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this walkthrough:

  • Amazon Redshift cluster
  • Amazon RDS database
  • EMR cluster

Conclusion

In this post, you learned how to configure an EMR cluster with managed scaling, assign node labels, and use a capacity scheduler to run mixed workload jobs on the EMR cluster. You created an Amazon Redshift cluster for sourcing data, Amazon RDS for Sqoop metadata, a Sqoop job to import data from Amazon Redshift to Amazon EMR, and a Spark job to test managed scaling and the usage of the capacity scheduler queues.

You also observed how to run Sqoop jobs on On-Demand nodes to provide resilience, whereas Spark jobs use inexpensive Spot nodes, which scale up and down based on the workload.

We used a sample capacity scheduler queue configuration for this post; you should adjust it for your specific workload requirements. Furthermore, you can create additional scheduler queues to meet more complex requirements.

We also showed how you can apply automation of the configuration for EMR cluster creation.

For more information about managed scaling and optimizing EC2 Spot usage, refer to Introducing Amazon EMR Managed Scaling – Automatically Resize Clusters to Lower Cost and Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR.

Appendix

The following code is the emr_union_job.py script:

#----------------------------------------------------------------------------------------
# Author: Ramesh Raghupathy
# Date: 06/15/2022
# Description: This pyspark script reads in 3 arguments (Input file name, multiply count 
# and outputPath (S3)). It reads the Input file and unions it as 
# many times as the Multiply count. By varying multiply count it is
# easy to generate different size of the workload required for testing
# managed scaling of a EMR cluster.
#----------------------------------------------------------------------------------------
from __future__ import print_function
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("yarn") \
        .appName("Generic Union") \
        .getOrCreate()

import sys
import time
from pyspark.sql import DataFrame
from functools import reduce

if (len(sys.argv) < 2):
    print ("Insufficient args ")
    quit()

ip_full_path = sys.argv[1]
outputPath = sys.argv[2].strip()
multiply_count = int(sys.argv[3].strip())
delimiter = '|'

ip_file = ip_full_path


print ("----------- Args Start '-------------")
print (multiply_count)
print (ip_full_path)
print (outputPath)
print ("----------- Args Done -------------")

ip_df  = spark.read.parquet(ip_file)

ip_df.show(20)

df_union = ip_df

for i in range(multiply_count):
    df_union = df_union.union(df_union)
    print(df_union.count())

df_union.show(30)
df_union.sort("custkey", "orderkey","comment1").show(50)

df_union.write.format("csv").mode("overwrite").option("compression", "bzip2").option("delimiter", delimiter).option("ignoreTrailingWhiteSpace", False).option("ignoreLeadingWhiteSpace", False).option("nullValue", "").option("emptyValue","").option("multiline","True").save(outputPath)


About the Authors

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Kiran Guduguntla is a WW Go-to-Market Specialist for Amazon EMR at AWS. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data analytics solutions.

Run Apache Spark with Amazon EMR on EKS backed by Amazon FSx for Lustre storage

Post Syndicated from Vara Bonthu original https://aws.amazon.com/blogs/big-data/run-apache-spark-with-amazon-emr-on-eks-backed-by-amazon-fsx-for-lustre-storage/

Traditionally, Spark workloads have been run on a dedicated setup like a Hadoop stack with YARN or MESOS as a resource manager. Starting from Apache Spark 2.3, Spark added support for Kubernetes as a resource manager. The new Kubernetes scheduler natively supports the submission of Spark jobs to a Kubernetes cluster. Spark on Kubernetes provides simpler administration, better developer experience, easier dependency management with containers, a fine-grained security layer, and optimized resource allocation. As a result, Spark on Kubernetes gained much traction for high-performance and cost-effective ways of running big data and machine learning (ML) workloads.

In AWS, we offer a managed service, Amazon EMR on EKS, to run your Apache Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS) . This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. EMR on EKS lets you run Spark applications alongside other application types on the same Amazon EKS cluster to improve resource utilization. In addition, EMR on EKS integrates with Amazon EMR Studio for authoring jobs and the Apache Spark UI for debugging out of the box to simplify infrastructure management.

For storage, EMR on EKS supports node ephemeral storage using hostPath where the storage is attached to individual nodes, and Amazon Elastic Block Store (Amazon EBS) volume per executor/driver pod using dynamic Persistent Volume Claims. However, some Spark users are looking for an HDFS-like shared file system to handle specific workloads like time-sensitive applications or streaming analytics. HDFS is best suited for jobs that requires highly interactive speed for a large number of files with random access reads, atomic rename operations, and sequential metadata requests.

Amazon FSx for Lustre is a fully managed shared storage option built on the world’s most popular high-performance file system. It offers highly scalable, cost-effective storage, which provides sub-millisecond latencies, millions of IOPS, and throughput of hundreds of gigabytes per second. Its popular use cases include high-performance computing (HPC), financial modeling, video rendering, and machine learning. FSx for Lustre supports two types of deployments:

  • Scratch file systems – These are designed for temporary or short-term storage where the data is not needed to replicate or persist if a file server fails
  • Persistent file systems – These are suitable for long-term storage where the file server is highly available and the data is replicated within the Availability Zone

In both deployment types, automatic data sync between the mounted file system and Amazon Simple Storage Service (Amazon S3) buckets is supported, helping you offload large volumes of cold and warm data for a better cost-efficient design. It makes multi-AZ or multi-region failover possible via Amazon S3 for businesses that require resiliency and availability.

This post demonstrates how to use EMR on EKS to submit Spark jobs with FSx for Lustre as the storage. It can be mounted on Spark driver and executor pods through static and dynamic PersistentVolumeClaims methods.

Static vs. dynamic provisioning

With static provisioning, the FSx for Lustre file system and PersistentVolume (PV) must be created in advance. The following diagram illustrates the static provisioning architecture. The Spark application driver and executor pods refer to an existing static PersistentVolumeClaim (PVC) to mount the FSx for Lustre file system.

Unlike static provisioning, the FSx for Lustre file system and PV doesn’t need to be pre-created for dynamic provisioning. As shown in the following diagram, the FSx for Lustre CSI driver plugin is deployed to an Amazon EKS cluster to dynamically provision the FSx for Lustre file system with a given PVC. Dynamic provisioning only requires a PVC and the corresponding storage class. After the PVC is created in Kubernetes, the FSx for Lustre CSI driver identifies the storage class and creates the requested file system.

The Spark application driver and executor pods in the architecture refer to an existing dynamic PVC to mount the FSx for Lustre file system.

Solution overview

In this post, you provision the following resources with Amazon EKS Blueprints for Terraform to run Spark jobs using EMR on EKS:

Pre-requisites

Before you build the entire infrastructure, you must have the following prerequisites:

Now you’re ready to deploy the solution.

Clone the GitHub repo

Open your terminal window, change to the home directory, and clone the GitHub repo:

cd ~
git clone https://github.com/aws-ia/terraform-aws-eks-blueprints.git

Then, navigate to the following:

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre

Initialize Terraform

Initialize the project, which downloads plugins that allow Terraform to interact with AWS services:

terraform init

Run terraform plan

Run terraform plan to verify the resources created by this deployment:

export AWS_REGION="<enter-your-region>"
terraform plan

The terraform plan output shows the resources that are created by this plan.

Run terraform apply

Run terraform apply to deploy the resources:

terraform apply --auto-approve

This deployment may take up to 30 minutes to create all the resources.

Verify the resources

Verify the Amazon EKS cluster created by the deployment. This following command displays the cluster details in JSON format:

aws eks describe-cluster --name emr-eks-fsx-lustre

Let’s create a kubeconfig file for the EKS cluster with the following command. This command creates a new cluster context entry with certificate authority data under ~/.kube/config to authenticate with the EKS cluster:

aws eks --region <ENTER_YOUR_REGION> update-kubeconfig --name emr-eks-fsx-lustre

Verify the managed node groups:

aws eks list-nodegroups —cluster-name emr-eks-fsx-lustre

The output should show two node groups:

{
    "nodegroups": [
        "core-node-grp-<some_random_numbers>",
        "spark-node-grp-<some_random_numbers>"
    ]
}

List the pods created by the FSx for Lustre CSI driver. The following command shows two controllers and an fsx-csi-node daemonset pod for each node:

kubectl get pods -n kube-system | grep fsx

List the namespace created for emr-data-team-a:

kubectl get ns | grep emr-data-team-a

The output will display the active namespace.

List the FSx storage class, PV, and PVCs created by this deployment. You may notice that fsx-dynamic-pvc is in Pending status because this dynamic PVC is still creating the FSx for Lustre. The dynamic PV status changed to Bound after the file system was created.

#FSx Storage Class
kubectl get storageclasses | grep fsx
  emr-eks-fsx-lustre   fsx.csi.aws.com         Delete          Immediate              false                  109s

# Output of static persistent volume with name fsx-static-pv
kubectl get pv | grep fsx  
  fsx-static-pv                              1000Gi     RWX            Recycle          Bound    emr-data-team-a/fsx-static-pvc       fsx

# Output of static persistent volume claim with name fsx-static-pvc and fsx-dynamic-pvc
kubectl get pvc -n emr-data-team-a | grep fsx
  fsx-dynamic-pvc   Pending                                             fsx            4m56s
  fsx-static-pvc    Bound     fsx-static-pv   1000Gi     RWX            fsx            4m56s

Log in to the FSx for Lustre console and verify the two file systems created by this deployment:

  • The first file system (emr-eks-fsx-lustre-static) is a persistent file system created with the Terraform resource
  • The second file system (fs-0e77adf20acb4028f) is created by the FSx for Lustre CSI driver dynamically with a dynamic PVC manifest

In this demo, we learn how to use a statically provisioned FSx for Lustre file system and dynamically provisioned FSx for Lustre file system in EMR on EKS Spark jobs.

Static provisioning

You can create an FSx for Lustre file system using the AWS CLI or any infrastructure as code (IaC) tool. In this example, we used Terraform to create the FSx for Lustre file system with deployment type as PERSISTENT_2. For static provisioning, we must create the FSx for Lustre file system first, followed by the PV and PVCs. After we create all three resources, we can mount the FSx for Lustre file system on a Spark driver and executor pod.

We use the following Terraform code snippet in the deployment to create the FSx for Lustre file system (2400 GB) and the file system association with the S3 bucket for import and export under the /data file system path. Note that this resource refers to a single subnet (single Availability Zone) for creating an FSx for Lustre file system. However, the Spark pods can use this file system across all Availability Zones, unlike the EBS volume, which is Availability Zone specific. In addition, the FSx for Lustre association with the S3 bucket creates a file system directory called /data. The Spark job driver and executor pod templates use this /data directory as a spark-local-dir for scratch space.

# New FSx for Lustre filesystem
resource "aws_fsx_lustre_file_system" "this" {
  deployment_type             = "PERSISTENT_2"
  storage_type                = "SSD"
  per_unit_storage_throughput = "500"
  storage_capacity            = 2400

  subnet_ids         = [module.vpc.private_subnets[0]]
  security_group_ids = [aws_security_group.fsx.id]
  log_configuration {
    level = "WARN_ERROR"
  }
  tags = merge({ "Name" : "${local.name}-static" }, local.tags)
}

# S3 bucket association with FSx for Lustre filesystem
resource "aws_fsx_data_repository_association" "example" {
  file_system_id       = aws_fsx_lustre_file_system.this.id
  data_repository_path = "s3://${aws_s3_bucket.this.id}"
  file_system_path     = "/data" # This directory will be used in Spark podTemplates under volumeMounts as subPath

  s3 {
    auto_export_policy {
      events = ["NEW", "CHANGED", "DELETED"]
    }

    auto_import_policy {
      events = ["NEW", "CHANGED", "DELETED"]
    }
  }
}

Persistent Volume

The following YAML template shows the definition of the PV created by this deployment. For example, running the command kubectl edit pv fsx-static-pv -n kube-system displays the manifest. PVs are a cluster scoped resource, therefore no namespace is defined in the template. The DevOps or cluster admin teams typically create this.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: fsx-static-pv
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 1000Gi
  claimRef:  # PV Claimed by fsx-static-pvc                
    apiVersion: v1
    kind: PersistentVolumeClaim             
    name: fsx-static-pvc
    namespace: emr-data-team-a
    resourceVersion: "5731"
    uid: 9110afc4-c605-440e-b022-190904866f0c
  csi:
    driver: fsx.csi.aws.com
    volumeAttributes:
      dnsname: fs-0a85fd096ef3f0089.fsx.eu-west-1.amazonaws.com # FSx DNS Name
      mountname: fz5jzbmv
    volumeHandle: fs-0a85fd096ef3f0089
  mountOptions:
  - flock
  persistentVolumeReclaimPolicy: Recycle

Persistent Volume Claim

The following YAML template shows the definition of the PVC created by this deployment. For example, running the command kubectl edit pvc fsx-static-pvc -n emr-data-team-a shows the deployed resource.

PVCs are namespace-specific resources typically created by the developers. The emr-data-team-a namespace is defined in the template.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: fsx-static-pvc
  namespace: emr-data-team-a
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 1000Gi
  storageClassName: fsx
  volumeMode: Filesystem
  volumeName: fsx-static-pv
status:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 1000Gi
  phase: Bound

Now that we have set up the static FSx for Lustre file system, we can use the PVC in EMR on EKS Spark jobs with pod templates. Key things to note in the template are that the volumes section in the following code is defined as persistentVolumeClaim with the claim name as fsx-static-pvc, and the containers section refers to the unique mountPath folder /static. We also use initContainers in the driver pod template to give correct permissions and ownership to the Hadoop users to be used by EMR on EKS driver executor pods. Finally, notice that data in the subPath is associated with the S3 bucket sync in the preceding Terraform resource.

We use the following driver pod template:

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-driver
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-static-pvc # Static PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-driver 
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static
          subPath: data # sub folder created in FSx for Lustre filesystem and mapped to s3 bucket sync and export
          readOnly: false
  initContainers:
    - name: spark-init-container-driver  
      image: public.ecr.aws/y4g4v0z7/busybox
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static
      command: ["sh", "-c", "chmod -R 777 /static", "chown -hR +999:+1000 /static/data"]

The executor pod template also refers to the same persistentVolumeClaim as fsx-static-pvc and volumeMounts with mountPath as /static. Notice that we don’t use the initContainers section in this template because the required permissions for the file system directory /static/data have been applied by the driver processes already. Because it’s a shared file system, the same permissions apply to the executor process as well.

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-exec
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-static-pvc # Static PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-executor # Don't change this name. EMR on EKS looking for this name
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static # mountPath name can be anything but this should match with Driver template as well
          subPath: data # sub folder created in FSx for Lustre filesystem and mapped to s3 bucket sync and export
          readOnly: false

Let’s run the sample PySpark script using the preceding pod templates. Navigate to the examples/spark-execute directory and run the shell script (fsx-static-spark.sh):

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre/examples/spark-execute

This shell script expects three input values. EMR_VIRTUAL_CLUSTER_ID and EMR_JOB_EXECUTION_ROLE_ARN can be extracted from the Terraform output values. Additionally, you create an S3 bucket with required permissions. This S3 bucket stores the sample PySpark scripts, pod templates, input and output data generated by this shell script, and the Spark job. Check out the shell script for more details.

EMR_VIRTUAL_CLUSTER_ID=$1     # Terraform output variable: emrcontainers_virtual_cluster_id    
S3_BUCKET=$2                  # This script requires s3 bucket as input parameter e.g., s3://<bucket-name>    
EMR_JOB_EXECUTION_ROLE_ARN=$3 # Terraform output variable: emr_on_eks_role_arn

Let’s run the fsx-static-spark.sh shell script. This job takes approximately 6 minutes by two executors, which processes 40 objects with a total size of 1.4 GB. Each object is around 36.4 MB. You can adjust the number of objects from 40 to any large number to process a large amount of data. This shell script downloads the public dataset (NY Taxi Trip Data) locally in your disk and uploads it to the S3 bucket using Amazon S3 sync. PySpark jobs read the data from the S3 buckets, apply GroupBy on a few fields, and write back to the S3 bucket to demonstrate the shuffling activity.

./fsx-static-spark.sh <EMR_VIRTUAL_CLUSTER_ID> \
s3://<YOUR_BUCKET_NAME> \
<EMR_JOB_EXECUTION_ROLE_ARN>

You can run the following queries to monitor the Spark job and the usage of the FSx for Lustre file system mounted on the driver and executor pods. Verify the job run events with the following command:

kubectl get pods --namespace=emr-data-team-a -w

You will notice one job object pod, a driver pod, and two executor pods. The Spark executor instances count can be updated in the Shell script.

You can also query to monitor the usage of FSx for Lustre mounted file system size. The following command shows the size of the mounted file system growth during the test run:

# Verify the used FSx for Lustre filesystem disk size with executor1
kubectl exec -ti ny-taxi-trip-static-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — df -h

# Verify the files created under /static/data FSx mount
kubectl exec -ti ny-taxi-trip-static-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — ls -lah /static

# Verify the file sync from FSx to S3 bucket. 
aws s3 ls s3://<YOUR_SYNC_BUCKET_NAME_FROM_TERRAFORM_OUTPUT>/

The following diagram shows the output for the preceding commands. The files under the executor are the same as those under the S3 bucket. These files are the same because the S3 sync feature is enabled in the FSx for Lustre file system. This test uses the FSx for Lustre file system for scratch space, so the shuffle files will be deleted from the FSx for Lustre file system and S3 bucket when the test is complete.

This PySpark job is writing the aggregated and repartition output directly to an S3 bucket location. Instead, you can choose to write to the FSx for Lustre file system path, which syncs to an S3 bucket eventually. The FSx for Lustre file system provides low latency, high throughput, and high IOPS for reading and writing data by multiple Spark Jobs. In addition, the data stored in FSx disk is synced to an S3 bucket for durable storage.

You can monitor the FSx for Lustre file system using Amazon CloudWatch metrics. The following time series graph shows the average stats with a period of 30 seconds.

When the Spark job is complete, you can verify the results in the Spark Web UI from the EMR on EKS console.

You can also verify the FSx for Lustre file system data sync to an S3 bucket.

Dynamic provisioning

So far, we have looked at an FSx for Lustre statically provisioned file system example and its usage with Spark jobs.

We can also provision an FSx for Lustre file system on-demand using the FSx for Lustre CSI driver and Persistent Volume Claim. Whenever you create a PVC with a dynamic volume referring to an FSx storage class, the FSx for Lustre CSI driver automatically provisions the FSx for Lustre file system and the corresponding Persistent Volume. Admin teams (DevOps) are responsible for deploying the FSx for Lustre CSI driver and FSx storage class, and the developers and data engineers (DataOps) are responsible for deploying the PVC, which refers to the FSx storage class.

The following storage class is deployed to Amazon EKS by this Terraform deployment. This dynamic PVC example doesn’t use the Amazon S3 backup association. You can still do that, but it requires an Amazon S3 config in the storage class manifest. Check out Dynamic Provisioning with Data Repository to configure the FSx storage class with the S3 import/export path with the choice of deployment type (SCRATCH_1, SCRATCH_2 and PERSISTENT_1). We have also created a dedicated security group used in this manifest. For more information, refer to File System Access Control with Amazon VPC.

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: fsx
provisioner: fsx.csi.aws.com
parameters:
  securityGroupIds: sg-0c8a656a0bbb17fe2
  subnetId: subnet-03cb3d850193b907b
reclaimPolicy: Delete
volumeBindingMode: Immediate

The following YAML template shows the definition of the dynamic PVC used in this deployment. Running the command kubectl edit pvc fsx-dynamic-pvc -n emr-data-team-a shows the deployed resource. PVCs are a namespace-specific resources typically created by the developers, therefore we define the emr-data-team-a namespace.

Spark can dynamically provision the PVC with claimName using SparkConf (for example, spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName=OnDemand). However, we recommend deploying the PVC before the start of Spark jobs to avoid delays to provision the FSx for Lustre file system during the job run. The FSx for Lustre file system takes approximately 10–12 minutes to complete.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: fsx-dynamic-pvc
  namespace: emr-data-team-a
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 2000Gi
  storageClassName: fsx # PVC reference to Storage class created by Terraform
  volumeMode: Filesystem
  volumeName: pvc-0da5a625-03ba-48fa-b08e-3f74291c0e5e # Dynamically created Persistent Volume
status:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 2400Gi
  phase: Bound

Now that we have set up the dynamic FSx for Lustre file system, we can use this in EMR on EKS Spark jobs using pod templates. Key things to note in the following template are that the volumes section is defined as persistentVolumeClaim with the claim name as fsx-dynamic-pvc, and the containers section refers to the unique mountPath folder as /dynamic. We also use initContainers in the driver pod template to give correct permissions and ownership to the Hadoop users to be used by EMR on EKS driver executor processes.

The following is our driver pod template:

# NOTE: PVC created before the start of the Spark job to avoid waiting for 15 mins to create FSx for Lustre filesystem while the job is running
---
apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-driver
  namespace: emr-data-team-a # Namespace used to submit the jobs
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-dynamic-pvc  # Dynamic PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-driver 
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic # FSx SCRATCH_1 filesystem for executors scratch space
          readOnly: false
  initContainers:  # initContainer only used in Driver to set the permissions for dynamically created filesystem.
    - name: spark-init-container-driver  
      image: public.ecr.aws/y4g4v0z7/busybox
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic # FSx Scratch 1 filesystem for executors scratch space
      command: ["sh", "-c", "chmod 777 /dynamic", "chown -hR +999:+1000 /dynamic"]

The executor pod template also refers to the same persistentVolumeClaim as fsx-dynamic-pvc and volumeMounts with mountPath as /dynamic:

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-exec
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-dynamic-pvc  # Dynamic PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-executor # Don't change this name. EMR on EKS looking for this name
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic  # FSx Scratch 1 filesystem for executor’s scratch space
          readOnly: false

Let’s run the sample PySpark script using the preceding pod templates. Navigate to the examples/spark-execute directory and run the shell script (fsx-dynamic-spark.sh). This script is the same as the static provisioning example; the only difference is the pod templates, which refer to the dynamic volumes.

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre/examples/spark-execute

This shell script expects three input values: EMR_VIRTUAL_CLUSTER_ID, EMR_JOB_EXECUTION_ROLE_ARN, and your S3 bucket name. Use the same values used in the previous static provisioning example.

Let’s run the fsx-dynamic-spark.sh shell script:

./fsx-dynamic-spark.sh <EMR_VIRTUAL_CLUSTER_ID> \
s3://<YOUR_BUCKET_NAME> \
<EMR_JOB_EXECUTION_ROLE_ARN>

After the job is triggered, run the following commands to see the output of the job:

# Output of dynamic persistent volume claim fsx-dynamic-pvc
kubectl get pvc -n emr-data-team-a | grep fsx-dynamic-pvc

# Verify the used FSx for Lustre filesystem disk size with executor1
kubectl exec -ti ny-taxi-trip-dynamic-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — df -h

# Verify the files created under /dynamic FSx mount
kubectl exec -ti ny-taxi-trip-dynamic-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — ls -lah /dynamic

The following screenshot shows the file system mounted under the /dynamic path. We can also see the Spark shuffle files created in the /dynamic folder.

Clean up

To clean up your environment, destroy the Terraform modules in reverse order. Then, empty any S3 buckets created by this module and run the following commands:

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre

terraform destroy -target="module.eks_blueprints_kubernetes_addons" -auto-approve

terraform destroy -target="module.eks_blueprints" -auto-approve

terraform destroy -target="module.vpc" -auto-approve

# Finally, destroy any additional resources that are not in the above modules

terraform destroy -auto-approve

Furthermore, log in to the AWS Management Console and delete any S3 buckets or FSX for Lustre file systems created by this deployment to avoid unwanted charges to your AWS account.

Conclusion

In this post, we demonstrated how to mount an FSx for Lustre file system as a PVC to Spark applications with EMR on EKS. We showed two mounting methods: static provisioning and dynamic provisioning via the FSx for Lustre CSI driver. The HDFS-like storage can be used by Spark on a Kubernetes pattern to achieve optimal storage performance. You can use it either as a temporary scratch space to store intermediate data while processing, or as a shared, persistent file system to exchange data for multiple pods in a single job or between multiple Spark jobs.

If you want to try out the full solution or for more EMR on EKS examples, check out our open-sourced project on GitHub.


About the authors

Vara Bonthu is a Senior Open Source Engineer focused on data analytics and containers working with Strategic Accounts. He is passionate about open source, big data, Kubernetes, and has a substantial development, DevOps, and architecture background.

Karthik Prabhakar is a Senior Analytics Specialist Solutions Architect at AWS, helping strategic customers adopt and run AWS Analytics services.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Implement a highly available key distribution center for Amazon EMR

Post Syndicated from Lorenzo Ripani original https://aws.amazon.com/blogs/big-data/implement-a-highly-available-key-distribution-center-for-amazon-emr/

High availability (HA) is the property of a system or service to operate continuously without failing for a designated period of time. Implementing HA properties over a system allows you to eliminate single points of failure that usually translate to service disruptions, which can then lead to a business loss or the inability to use a service.

The core idea behind fault tolerance and high availability is very straightforward in terms of definition. You usually use multiple machines to give you redundancy for a specific service. This guarantees that if a host goes down, other machines are able to take over the traffic. Although this might be easy to say, it’s difficult to obtain such a property, especially when working with distributed technologies.

When focusing on Hadoop technologies, the concept of availability multiplies in different layers depending on the frameworks we’re using. To achieve a fault-tolerant system, we need to consider the following layers:

  • Data layer
  • Processing layer
  • Authentication layer

The first two layers are typically handled using native capabilities of the Hadoop framework (such as HDFS High Availability or ResourceManager High Availability) or with the help of features available in the specific framework used (for example, HBase table replication to achieve highly available reads).

The authentication layer is typically managed through the utilization of the Kerberos protocol. Although multiple implementations of Kerberos exist, Amazon EMR uses a free implementation of the Kerberos protocol, which is directly provided by the Massachusetts Institute of Technology (MIT), also referred to as MIT Kerberos.

When looking at the native setup for a key distribution center (KDC), we can see that the tool comes with a typical primary/secondary configuration, where you can configure a primary KDC with one or more additional replicas to provide some features of a highly available system.

However, this configuration doesn’t provide an automatic failover mechanism to elect a new primary KDC in the event of a system interruption. As a result, the failover has to be performed manually or by implementing an automated process, which can be complex to set up.

With AWS native services, we can improve the MIT KDC capabilities to increase the resilience to failures of our system.

Highly available MIT KDC

Amazon EMR provides different architecture options to enable Kerberos authentication, where each of them tries to solve a specific need or use case. Kerberos authentication can be enabled by defining an Amazon EMR security configuration, which is a set of information stored within Amazon EMR itself. This enables you to reuse this configuration across multiple clusters.

When creating an Amazon EMR security configuration, you’re asked to choose between a cluster-dedicated KDC or an external KDC, so it’s important to understand the benefits and limits of each solution.

When you enable the cluster-dedicated KDC, Amazon EMR configures and installs an MIT KDC on the EMR primary node of the cluster that you’re launching. In contrast, when you use an external KDC, the cluster launched relies on a KDC external to the cluster. In this case, the KDC can be a cluster-dedicated KDC of a different EMR cluster that you reference as an external KDC, or a KDC installed on an Amazon Elastic Compute Cloud (Amazon EC2) instance or a container that you own.

The cluster-dedicated KDC is an easy configuration option that delegates the installation and configuration of the KDC service to the cluster itself. This option doesn’t require significant knowledge of the Kerberos system and might be a good option for a test environment. Additionally, having a dedicated KDC in a cluster enables you to segregate the Kerberos realm, thereby providing a dedicated authentication system that can be used only to authenticate a specific team or department in your organization.

However, because the KDC is located on the EMR primary node, you have to consider that if you delete the cluster, the KDC will be deleted as well. Considering the case in which the KDC is shared with other EMR clusters (defined as external KDC in their security configuration), the authentication layer for those will be compromised and as a result all Kerberos enabled frameworks will break. This might be acceptable in test environments, but it’s not recommended for a production one.

Because the KDC lifetime isn’t always bound to a specific EMR cluster, it’s common to use an external KDC located on an EC2 instance or Docker container. This pattern comes with some benefits:

  • You can persist end-user credentials in the Kerberos KDC rather than using an Active Directory (although you can also enable a cross-realm trust)
  • You can enable communication across multiple EMR clusters, so that all the cluster principals join the same Kerberos realm, thereby enabling a common authentication system for all the clusters
  • You can remove the dependency of the EMR primary node, because deleting it will result in an impairment for other systems to authenticate
  • If you require a multi-master EMR cluster, then an external KDC is required

That being said, installing an MIT KDC on a single instance doesn’t address our HA requirements, which typically are crucial in a production environment. In the following section, we discuss how we can implement a highly available MIT KDC using AWS services to improve the resiliency of our authentication system.

Architecture overview

The architecture presented in the following diagrams describes a highly available setup across multiple Availability Zones for our MIT Kerberos KDC that uses AWS services. We propose two versions of the architecture: one based on an Amazon Elastic File System (Amazon EFS) file system, and another based on an Amazon FSx for NetApp ONTAP file system.

Both services can be mounted on EC2 instances and used as local paths. Although Amazon EFS is cheaper compared to Amazon FSx for NetApp ONTAP, the latter provides better performance thanks to the sub-millisecond operation latency it provides.

We performed multiple tests to benchmark the solutions involving the different file systems. The following graph shows the results with Amazon EMR 5.36, in which we measured the time in seconds taken by the cluster to be fully up and running when selecting Hadoop and Spark as frameworks.

Looking at the test results, we can see that the Amazon EFS file system is suitable to handle small clusters (fewer than 100 nodes), because the performance degradation introduced by the latency of lock operations on the NFS protocol increases the delay in launching clusters as we add more nodes in our cluster topology. For example, for clusters with 200 nodes, the delay introduced by the Amazon EFS file system is such that some instances can’t join the cluster in time. As a result, those instances are deleted and then replaced, making the entire cluster provisioning slower. This is the reason why we decided not to publish any metric for Amazon EFS for 200 cluster nodes on the preceding graph.

On the other side, Amazon FSx for NetApp ONTAP is able to better handle the increasing number of principals created during the cluster provisioning with reduced performance degradation compared to Amazon EFS.

Even with the solution involving Amazon FSx for NetApp ONTAP, for clusters with a higher number of instances it’s still possible to encounter the behavior described earlier for Amazon EFS. Therefore, for big cluster configurations, this solution should be carefully tested and evaluated.

Amazon EFS based solution

The following diagram illustrates the architecture of our Amazon EFS based solution.

The infrastructure relies on different components to improve the fault tolerance of the KDC. The architecture uses the following services:

  • A Network Load Balancer configured to serve Kerberos service ports (port 88 for authentication and port 749 for admin tasks like principals creation and deletion). The purpose of this component is to balance requests across multiple KDC instances located in separate Availability Zones. In addition, it provides a redirection mechanism in case of failures while connecting to an impaired KDC instance.
  • An EC2 Auto Scaling group that helps you maintain KDC availability and allows you to automatically add or remove EC2 instances according to conditions you define. For the purpose of this scenario, we define a minimum number of KDC instances equal to two.
  • The Amazon EFS file system provides a persistent and reliable storage layer for our KDC database. The service comes with built-in HA properties, so we can take advantage of its native features to obtain a persistent and reliable file system.
  • We use AWS Secrets Manager to store and retrieve Kerberos configurations, in specific the password used for the Kadmin service, the Kerberos domain and realm managed by the KDC. With Secrets Manager, we avoid inputting any sensitive information as script parameters or passwords while launching KDC instances.

With this configuration, we eliminate the downsides resulting from a single instance installation:

  • The KDC isn’t a single point of failure anymore because failed connections are redirected to healthy KDC hosts
  • The lack of Kerberos traffic against the EMR primary node for the authentication will improve the health of our primary node, which might be critical for large Hadoop installations (hundreds of nodes)
  • We can recover in case of failures, allowing survived instances to fulfill both admin and authentication operations

Amazon FSx for NetApp ONTAP based solution

The following diagram illustrates the solution using Amazon FSx for NetApp ONTAP.

This infrastructure is almost identical compared to the previous one and provides the same benefits. The only difference is the utilization of a Multi-AZ Amazon FSx for NetApp ONTAP file system as a persistent and reliable storage layer for our KDC database. Even in this case, the service comes with built-in HA properties so we can take advantage of its native features to obtain a persistent and reliable file system.

Solution resources

We provide an AWS CloudFormation template in this post as a general guide. You should review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

The CloudFormation template contains several nested templates. Together, they create the following:

  • An Amazon VPC with two public and two private subnets where the KDC instances can be deployed
  • An internet gateway attached to the public subnets and a NAT gateway for the private subnets
  • An Amazon Simple Storage Service (Amazon S3) gateway endpoint and a Secrets Manager interface endpoint in each subnet

After the VPC resources are deployed, the KDC nested template is launched and provisions the following components:

  • Two target groups, each connected to a listener for the specific KDC port to monitor (88 for Kerberos authentication and 749 for Kerberos administration).
  • One Network Load Balancer to balance requests across the KDC instances created in different Availability Zones.
  • Depending on the chosen file system, an Amazon EFS or Amazon FSx for NetApp ONTAP file system is created across multiple Availability Zones.
  • Configuration and auto scaling to provision the KDC instances. In specific, the KDC instances are configured to mount the selected file system on a local folder that is used to store the principals database of the KDC.

At the end of the second template, the EMR cluster is launched with an external KDC set up and, if chosen, a multi-master configuration.

Launch the CloudFormation stack

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:
    BDB-2063-launch-cloudformation-stack
    This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region. The CloudFormation stack requires a few parameters, as shown in the following screenshot.


    The following tables describe the parameters required in each section of the stack.
  2. In the Core section, provide the following parameters:

    Parameter Value (Default) Description
    Project aws-external-kdc The name of the project for which the environment is deployed. This is used to create AWS tags associated to each resource created in the stack.
    Artifacts Repository aws-blogs-artifacts-public/artifacts/BDB-1689 The Amazon S3 location hosting templates and script required to launch this stack.
  3. In the Networking section, provide the following parameters:

    Parameter Value (Default) Description
    VPC Network 10.0.0.0/16 Network range for the VPC (for example, 10.0.0.0/16).
    Public Subnet One 10.0.10.0/24 Network range for the first public subnet (for example, 10.0.10.0/24).
    Public Subnet Two 10.0.11.0/24 Network range for the second public subnet (for example, 10.0.11.0/24).
    Private Subnet One 10.0.1.0/24 Network range for the private subnet (for example, 10.0.1.0/24).
    Private Subnet Two 10.0.2.0/24 Network range for the private subnet (for example, 10.0.2.0/24).
    Availability Zone One (user selected) The Availability Zone chosen to host the first private and public subnets. This should differ from the value used for the Availability Zone Two parameter.
    Availability Zone Two (user selected) The Availability Zone chosen to host the second private and public subnets. This should differ from the value used for the Availability Zone One parameter.
  4. In the KDC section, provide the following parameters:

    Parameter Value (Default) Description
    Storage Service Amazon EFS Specify the KDC shared file system: Amazon EFS or Amazon FSx for NetApp ONTAP.
    Amazon Linux 2 AMI /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2 AWS Systems Manager parameter alias to retrieve the latest Amazon Linux 2 AMI.
    Instance Count 2 Number of KDC instances launched.
    Instance Type c5.large KDC instance type.
    KDC Realm HADOOP.LAN The Kerberos realm managed by the external KDC servers.
    KAdmin Password Password123 The password to perform admin operations on the KDC.
    Kerberos Secret Name aws-external-kdc/kerberos.config Secrets Manager secret name used to store Kerberos configurations.
  5. In the EMR section, provide the following parameters:

    Parameter Value (Default) Description
    Multi Master Disabled When enabled, the cluster is launched with three primaries configured with Hadoop HA.
    Release Version emr-5.36.0 Amazon EMR release version.
    (Workers) Instance Type m5.xlarge The EC2 instance type used to provision the cluster.
    (Workers) Node Count 1 The number of Amazon EMR CORE nodes provisioned while launching the cluster.
    SSH Key Name (user selected) A valid SSH PEM key that will be attached to the cluster and KDC instances to provide SSH remote access.
  6. Choose Next.
  7. Add additional AWS tags if required (the solution already uses some predefined AWS tags).
  8. Choose Next.
  9. Acknowledge the final requirements.
  10. Choose Create stack.

Make sure to select different Availability Zones in the Network selection of the template (Availability Zone One and Availability Zone Two). This prevents failures in the event of an impairment for an entire Availability Zone.

Test the infrastructure

After you’ve provisioned the whole infrastructure, it’s time to test and validate our HA setup.

In this test, we simulate an impairment on a KDC instance. As a result, we’ll see how we’re able to keep using remaining healthy KDCs, and we’ll see how the infrastructure self-recovers by adding an additional KDC as a substitution for the failed one.

We performed our tests by launching the CloudFormation stack and specifying two KDC instances and using Amazon EFS as the storage layer for the KDC database. The EMR cluster is launched with 11 CORE nodes.

After we deploy the whole infrastructure, we can connect to the EMR primary node using an SSH connection to perform our tests.

When inside our primary node instance, we can proceed with our test setup.

  1. First, we create 10 principals inside the KDC database. To do so, create a bash script named create_users.sh with the following content:
    #!/bin/bash
    realm="HADOOP.LAN"
    password="Password123"
    num_users=10
    
    for (( i=1; i<=$num_users; i++ )); do
      echo "Creating principal [email protected]$realm"
      echo -e "$password\n$password\n$password" | kadmin -p kadmin/[email protected]$realm addprinc "[email protected]$realm" > /dev/null 2>&1
    done

  2. Run the script using the following command:
    sh create_users.sh

  3. We can now verify those 10 principals have been correctly created inside the KDC database. To do so, create another script called list_users.sh and run it as the previous one:
    #!/bin/bash
    realm="HADOOP.LAN"
    password="Password123"
    
    echo -e "$password\n$password\n$password" | kadmin -p kadmin/[email protected]$realm listprincs

    The output of the script shows the principals created by the cluster nodes when they’re provisioned, along with our test users just created.

    We now run in parallel multiple kinit requests and while doing so, we stop the krb5kdc process on one of the two available KDC instances.

    The test is performed through Spark to achieve high parallelization on the kinit requests.

  4. First, create the following script and call it user_kinit.sh:
    #!/bin/sh
    realm="HADOOP.LAN"
    password="Password123"
    num_users="10"
    
    for (( i=1; i<=$num_users; i++ )); do
      echo -e "$password" | kinit [email protected]$realm > /dev/null 2>&1
      echo $?
    done

  5. Open a spark-shell and use the --files parameter to distribute the preceding bash script to all the Spark executors. In addition, we disable the Spark dynamic allocation and launch our application with 10 executors, each using 4 vCores.
    spark-shell --files user_kinit.sh --num-executors 10 --conf spark.dynamicAllocation.enabled=false --conf spark.executor.cores=4

  6. We can now run the following Scala statements to initiate our distributed test:
    val tasks = spark.sparkContext.parallelize(1 to 1600, 1600)
    val scriptPath = "./user_kinit.sh"
    val pipeRDD = tasks.pipe(scriptPath)
    pipeRDD.map(_.toInt).sum

    This Spark application creates 1,600 tasks, and each task performs 10 kinit requests. These tasks are run in parallel in batches of 40 Spark tasks at a time. The final output of our command returns the number of failed kinit requests.

  7. We should now connect on the two available KDCs instances. We can connect without SSH keys by using AWS Systems Manager Session Manager because our template doesn’t provide any SSH key to the KDC instances for additional security. To connect on the KDC instances from the Amazon EC2 console using AWS Systems Manager, see Starting a session (Amazon EC2 console).
  8. On the first KDC, run the following commands to show incoming kinit authentication requests:
    sudo -s
    tail -f /var/log/kerberos/krb5kdc.log

    The following screenshot shows a sample output.

  9. On the second KDC, simulate a failure by running the following commands:

    sudo -s
    killall krb5kdc

  10. We can now connect to the Amazon EC2 console and open the KDC related target group to confirm that the instance became unhealthy (after the three consecutive health checks failed), and was then deleted and replaced by a new one.
    The target group performed the following specific steps during an impairment in one of the services:

    • The KDC instance enters the unhealthy state
    • The unhealthy KDC instance is de-registered from the target group (draining process)
    • A new KDC instance is launched
    • The new KDC is registered to the target group so that it can start receiving traffic from the load balancer

    You might expect to see output similar to the following screenshot while causing an impairment in one of your KDCs.

  11. If we now connect on the replaced KDC instance, we can see the traffic starting to appear in the krbr5kdc logs.

At the end of the tests, we have a total number of failed Kerberos authentications.

As we can see from the output result, we didn’t get any failure during this test. However, when repeating the test multiple times, you might still expect to see few errors (one or two on average) that might occur due to the krbr5kdc process stopping while some requests are still authenticating.

Note the kinit tool itself doesn’t have any retry mechanism. Both the Hadoop services running on the cluster and the creation of Kerberos principals during EMR instance provisioning are configured to retry if KDC calls fails.

If you want to automate these tests, you might also consider using AWS Fault Injection Simulator, a fully managed service for running fault injection experiments on AWS that makes it easier to improve an application’s performance, observability, and resiliency.

Clean up

To clean up all the resources:

  1. Delete the root stack in AWS CloudFormation.
  2. After a while from the deletion startup, you should see a failure.
  3. Click on the VPC nested CloudFormation stack, choose Resources.You should see a single DELETE_FAILED entry for the VPC resource. This is due to EMR automatically creating the Default Security Groups and those are preventing the VPC to be deleted by CloudFormation.
  4. Move to the VPC section of the AWS console and delete that VPC manually.
  5. After that, move back to Cloudformation, select again the root stack and choose Delete. This time the deletion should complete.

File system backups

Both Amazon EFS and Amazon FSx for NetApp ONTAP are natively integrated with AWS Backup.

AWS Backup helps you automate and centrally manage your backups. After you create policy-driven plans, you can monitor the status of ongoing backups, verify compliance, and find and restore backups, all from a central console.

To get more information, refer to Using AWS Backup to back up and restore Amazon EFS file systems and Using AWS Backup with Amazon FSx.

Additional considerations

In this section, we share some additional considerations when using this solution.

Shared file system latency impacts

The utilization of a shared file system implies a degradation of the performance. In particular, the more Kerberos principals that have to be created at the same time, the more we can see a latency on the overall principals creation process and also on the cluster startup time.

This performance degradation is proportional to the number of parallel KDC requests made at the same time. For example, consider the scenario in which we have to launch 10 clusters, each with 20 nodes connected to the same KDC. If we launch all 10 clusters at the same time, we can potentially have 10×20 = 200 parallel connections to the KDC during the initial instance provisioning for the creation of the frameworks related Kerberos principals. In addition, because the duration of Kerberos tickets for services is 10 hours by default, and because all the cluster services are launched more or less at the same time, we could also have the same level of parallelism for service tickets renewal. If, instead, we launch these 10 clusters with a time gap between them, we’ll have potentially 20 parallel connections and as a result the latency introduced by the shared file system isn’t very impactful.

As discussed earlier in this post, multiple clusters can share the same KDC in case they need to communicate between each other without having to set up a cross-realm trust between the related KDCs. Before attaching multiple clusters to the same KDC, you should evaluate if there is a real need for that, because you might also consider segregating Kerberos realms on different KDC instances to obtain better performance and reduce the blast radius in case of issues.

Single-AZ high availability consideration

Although the solutions presented in this post might serve the purpose to provide a highly available MIT KDC across multiple Availability Zones, you might be only interested in providing an HA setup in a single Availability Zone. In this case, for better performance, you might also consider using Amazon FSx for Lustre, or attaching an IO2 EBS disk to multiple KDC instances in the same Availability Zone. In both cases, you might still use the same KDC script used in this post by just modifying the mount command to attach the shared file system on the KDC instances.

If you want to use an IO2 EBS volume as your shared file system, you have to set up a clustered file system to ensure data resiliency and reliability of our KDC database, because standard file systems such as XFS or EXT4 aren’t designed for such use cases. For example, you can use a GFS2 file system to access the KDC database simultaneously across KDC instances. For more details on how to set up a GFS2 file system on EC2 instances, refer to Clustered storage simplified: GFS2 on Amazon EBS Multi-Attach enabled volumes.

Summary

High availability and fault tolerance are key requirements for EMR clusters that can’t tolerate downtime. Analytics workloads run within those clusters can deal with sensitive data, therefore operating in a secured environment is also essential. As a result, we need a secure, highly available, and fault-tolerant setup.

In this post, we showed one possible way of achieving high availability and fault tolerance for the authentication layer of our big data workloads in Amazon EMR. We demonstrated how, by using AWS native services, multiple Kerberos KDCs can operate in parallel and be automatically replaced in case of failures. This, in combination with the framework-specific high availability and fault tolerance capabilities, allows us to operate in a secure, highly available, and fault-tolerant environment.


About the authors

Lorenzo Ripani is a Big Data Solution Architect at AWS. He is passionate about distributed systems, open source technologies and security. He spends most of his time working with customers around the world to design, evaluate and optimize scalable and secure data pipelines with Amazon EMR.

Stefano Sandona is an Analytics Specialist Solution Architect with AWS. He loves data, distributed systems and security. He helps customers around the world architecting their data platforms. He has a strong focus on Amazon EMR and all the security aspects around it.

Store Amazon EMR in-transit data encryption certificates using AWS Secrets Manager

Post Syndicated from Hao Wang original https://aws.amazon.com/blogs/big-data/store-amazon-emr-in-transit-data-encryption-certificates-using-aws-secrets-manager/

With Amazon EMR, you can use a security configuration to specify settings for encrypting data in transit. When in-transit encryption is configured, you can enable application-specific encryption features, for example:

  • Hadoop HDFS NameNode or DataNode user interfaces use HTTPS
  • Hadoop MapReduce encrypted shuffle uses Transport Layer Security (TLS)
  • Presto nodes internal communication uses SSL/TLS (Amazon EMR version 5.6.0 and later only)
  • Spark component internal RPC communication, such as the block transfer service and the external shuffle service, is encrypted using the AES-256 cipher in Amazon EMR versions 5.9.0 and later
  • HTTP protocol communication with user interfaces such as Spark History Server and HTTPS-enabled file servers is encrypted using Spark’s SSL configuration

The security configuration of Amazon EMR allows you to set up TLS certificates to encrypt data in transit. A security configuration provides the following options to specify TLS certificates:

  • As a path to a .zip file in an Amazon Simple Storage Service (Amazon S3) bucket that contains all certificates
  • Through a custom certificate provider as a Java class

In many cases, company security policies prohibit storing any type of sensitive information in an S3 bucket, including certificate private keys. For that reason, the only remaining option to secure data in transit on Amazon EMR is to configure the custom certificate provider.

In this post, I guide you through the configuration process and provide Java code samples to secure data in transit on Amazon EMR by storing TLS custom certificates using AWS Secrets Manager.

Secrets Manager helps you protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle. Users and applications retrieve secrets with a call to Secrets Manager APIs, eliminating the need to hardcode sensitive information in plain text.

Solution overview

The following diagram illustrates the solution architecture.

During an EMR cluster start, if a custom certificate provider is configured for in-transit encryption, the provider is called to get the certificates. A custom certificate provider is a Java class that implements the TLSArtifactsProvider interface.

To make this solution work, you need a secure place to store certificates that can also be accessed by Java code. This post uses Secrets Manager, which provides a mechanism for managing certificates, and encrypts them using AWS Key Management Service (AWS KMS) keys.

To implement this solution, you complete the following high-level steps:

  1. Create a certificate.
  2. Store your certificate to Secrets Manager.
    1. Create a secret for a private key.
    2. Create a secret for a public key.
  3. Implement TLSArtifactsProvider.
  4. Create the Amazon EMR security configuration.
  5. Modify the Amazon Elastic Compute Cloud (Amazon EC2) instance profile role to get the certificate from Secrets Manager.
  6. Start the Amazon EMR cluster.

Create a certificate

For demonstration purposes, this post uses OpenSSL to create a self-signed certificate. See the following code:

openssl req -x509 -newkey rsa:4096 -keyout privateKey.pem -out certificateChain.pem -days 365 -subj "/C=US/ST=MA/L=Boston/O=EMR/OU=EMR/CN=*.ec2.internal" -nodes

This command creates a self-signed, 4096-bit certificate. For production systems, we recommend using a trusted certificate authority (CA) to issue certificates.

The command above has the following parameters:

  • keyout – The output file in which to store the private key.
  • out – The output file in which to store the certificate.
  • days – The number of days for which to certify the certificate.
  • subj – The subject name for a new request. The common name (CN) must match the domain name specified in DHCP that is assigned to the virtual private cloud (VPC). The default is ec2.internal. The * prefix is the wildcard certificate.
  • nodes – Allows you to create a private key without a password, which is without encryption.

The output of OpenSSL includes a pair of keys—one private and one public:

  • privateKey.pem – SSL private key certificate
  • certificateChain.pem – SSL public key certificate

Store your certificate to Secrets Manager

In this section, we walk through the steps to create secrets for a private key and a public key.

Create a secret for a private key

To create a secret for a private key, complete the following steps:

  1. On the Secrets Manager console, choose Store a new secret.
  2. For the secret type, select Other type of secrets.
  3. On the Plaintext tab in the Key/value pairs section, copy the content from privateKey.pem.
  4. For Encryption key, choose DefaultEncryptionKey.
  5. Choose Next.
  6. For Secret name, enter emrprivate.
  7. For Resource permissions, optionally add or edit a resource policy to access secrets across AWS accounts. For more information, refer to Permissions policy examples.
  8. Choose Next.
  9. Choose Store.

Create a secret for a public key

To create a secret for a public key, complete the following steps:

  1. On the Secrets Manager console, choose Store a new secret.
  2. For the secret type, select Other type of secrets.
  3. On the Plaintext tab in the Key/value pairs section, copy the content from certificateChain.pem.
  4. For Encryption key, choose DefaultEncryptionKey.
  5. Choose Next.
  6. For Secret name, enter emrcert.
  7. For Resource permissions, optionally add or edit a resource policy to access secrets across AWS accounts.
  8. Choose Next.
  9. Choose Store.

Implement TLSArtifactsProvider

This section describes the flow in the Java code only. You can download the full code from GitHub.

The interface uses the getTlsArtifacts method, which expects certificates in return:

Java class EmrTlsFromSecretsManager implements following TLSArtifactsProvider interface

public abstract class TLSArtifactsProvider {

  public abstract TLSArtifacts getTlsArtifacts();
}

In the provided code example, we implement the following logic:

@Override
public TLSArtifacts getTlsArtifacts() {

   init();

   //Get private key from string
   PrivateKey privateKey = getPrivateKey(this.tlsPrivateKey);

   //Get certificate from string
   List<Certificate> certChain = getX509FromString(this.tlsCertificateChain);
   List<Certificate> certs = getX509FromString(this.tlsCertificate);

   return new TLSArtifacts(privateKey,certChain,certs);
}

The parameters are as follows:

  • init() – Includes the following:
    • readTags() – Reads the secret ARNs from the Amazon EMR tags
    • getCertificates() – Gets the certificates from Secrets Manager
  • getX509FromString() – Converts certificates to an X509 format
  • getPrivateKey() – Converts the private key to the correct format

Compile the Java project, and you will get the file emr-tls-provider-samples-0.1-jar-with-dependencies.jar. Alternatively you can download the JAR file from GitHub.

Create the Amazon EMR security configuration

To create the Amazon EMR security configuration, complete the following steps:

  1. Upload the emr-tls-provider-samples-0.1-jar-with-dependencies.jar file to an S3 bucket.
  2. On the Amazon EMR console, choose Security configurations, then choose Create.
  3. Enter a name for your new security configuration; for example, emr-tls-ssm.
  4. Select Enable in-transit encryption.
  5. For Certificate provider type, choose Custom.
  6. For Custom key provider location, enter the Amazon S3 path to the Java JAR file.
  7. For Certificate provider class, enter the name of the Java class. In the example code, the name is com.amazonaws.awssamples.EmrTlsFromSecretsManager.
  8. Configure the at-rest encryption as required.
  9. Choose Create.

Modify the EC2 instance profile role

Applications running on Amazon EMR assume and use the Amazon EMR role for Amazon EC2 to interact with other AWS services. To grant permissions to get certificates from Secrets Manager, add the following policy to your EC2 instance profile role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": [
                "arn:aws:secretsmanager:<region>:<account-id>:secret:emrprivate-*",
                "arn:aws:secretsmanager:<region>:<account-id>:secret:emrcert-*"
            ]
        }
    ]
}

Make sure you limit the scope of the Secrets Manager policy to only the certificates that are required for provisioning.

Start the cluster

To reuse the same Java JAR file with different certificates and configurations, you can provide secret ARNs to EmrTlsFromSecretsManager through Amazon EMR tags, rather than embedding them in Java code.

In this example, we use the following tags:

  • sm:ssl:emrcert – The ARN of the Secrets Manager parameter key storing the CA-signed certificate
  • sm:ssl:emrprivate – The ARN of the Secrets Manager parameter key storing the CA-signed certificate private key

Validation

After the cluster is started successfully, you are able to access the HDFS NameNode and DataNode UI via HTTPS. For more information, see View web interfaces hosted on Amazon EMR clusters.

Clean Up

If you don’t need the resources you created in the earlier steps, you can delete the Secrets Manager secrets and EMR cluster in order to avoid additional charges.

  1. On the Secrets Manager console, select the secrets you created.
  2. On the Actions menu, choose Delete secret.This doesn’t automatically delete the secrets, because you need to set a waiting period that allows for the secrets to be restored, if needed. The minimum time is 7 days.
  3. On the Amazon EMR console, select the cluster you created.
  4. Choose Terminate.

The process of deleting the EMR cluster takes a few minutes to complete.

Conclusion

In this post, we demonstrated how to create your custom Amazon EMR TLSArtifactsProvider interface and use Secrets Manager to store certificates. This allows you to define a more secure way to store and use certificates for Amazon EMR in-transit data encryption.


About the author

Hao Wang is a Senior Big Data Architect at AWS. Hao actively works with customers building large scale data platforms on AWS. He has a background as a software architect on implementing distributed software systems. In his spare time, he enjoys reading and outdoor activities with his family.

Convert Oracle XML BLOB data to JSON using Amazon EMR and load to Amazon Redshift

Post Syndicated from Abhilash Nagilla original https://aws.amazon.com/blogs/big-data/convert-oracle-xml-blob-data-to-json-using-amazon-emr-and-load-to-amazon-redshift/

In legacy relational database management systems, data is stored in several complex data types, such XML, JSON, BLOB, or CLOB. This data might contain valuable information that is often difficult to transform into insights, so you might be looking for ways to load and use this data in a modern cloud data warehouse such as Amazon Redshift. One such example is migrating data from a legacy Oracle database with XML BLOB fields to Amazon Redshift, by performing preprocessing and conversion of XML to JSON using Amazon EMR. In this post, we describe a solution architecture for this use case, and show you how to implement the code to handle the XML conversion.

Solution overview

The first step in any data migration project is to capture and ingest the data from the source database. For this task, we use AWS Database Migration Service (AWS DMS), a service that helps you migrate databases to AWS quickly and securely. In this example, we use AWS DMS to extract data from an Oracle database with XML BLOB fields and stage the same data in Amazon Simple Storage Service (Amazon S3) in Apache Parquet format. Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance, and is the storage of choice for setting up data lakes on AWS.

After the data is ingested into an S3 staging bucket, we used Amazon EMR to run a Spark job to perform the conversion of XML fields to JSON fields, and the results are loaded in a curated S3 bucket. Amazon EMR runtime for Apache Spark can be over three times faster than clusters without EMR runtime, and has 100% API compatibility with standard Apache Spark. This improved performance means your workloads run faster and it saves you compute costs, without making any changes to your application.

Finally, transformed and curated data is loaded into Amazon Redshift tables using the COPY command. The Amazon Redshift table structure should match the number of columns and the column data types in the source file. Because we stored the data as a Parquet file, we specify the SERIALIZETOJSON option in the COPY command. This allows us to load complex types, such as structure and array, in a column defined as SUPER data type in the table.

The following architecture diagram shows the end-to-end workflow.

In detail, AWS DMS migrates data from the source database tables into Amazon S3, in Parquet format. Apache Spark on Amazon EMR reads the raw data, transforms the XML data type into JSON, and saves the data to the curated S3 bucket. In our code, we used an open-source library, called spark-xml, to parse and query the XML data.

In the rest of this post, we assume that the AWS DMS tasks have already run and created the source Parquet files in the S3 staging bucket. If you want to set up AWS DMS to read from an Oracle database with LOB fields, refer to Effectively migrating LOB data to Amazon S3 from Amazon RDS for Oracle with AWS DMS or watch the video Migrate Oracle to S3 Data lake via AWS DMS.

Prerequisites

If you want to follow along with the examples in this post using your AWS account, we provide an AWS CloudFormation template you can launch by choosing Launch Stack:

BDB-2063-launch-cloudformation-stack

Provide a stack name and leave the default settings for everything else. Wait for the stack to display Create Complete (this should only take a few minutes) before moving on to the other sections.

The template creates the following resources:

  • A virtual private cloud (VPC) with two private subnets that have routes to an Amazon S3 VPC endpoint
  • The S3 bucket {stackname}-s3bucket-{xxx}, which contains the following folders:
    • libs – Contains the JAR file to add to the notebook
    • notebooks – Contains the notebook to interactively test the code
    • data – Contains the sample data
  • An Amazon Redshift cluster, in one of the two private subnets, with a database named rs_xml_db and a schema named rs_xml
  • A secret (rs_xml_db) in AWS Secrets Manager
  • An EMR cluster

The CloudFormation template shared in this post is purely for demonstration purposes only. Please conduct your own security review and incorporate best practices prior to any production deployment using artifacts from the post.

Finally, some basic knowledge of Python and Spark DataFrames can help you review the transformation code, but isn’t mandatory to complete the example.

Understanding the sample data

In this post, we use college students’ course and subjects sample data that we created. In the source system, data consists of flat structure fields, like course_id and course_name, and an XML field that includes all the course material and subjects involved in the respective course. The following screenshot is an example of the source data, which is staged in an S3 bucket as a prerequisite step.

We can observe that the column study_material_info is an XML type field and contains nested XML tags in it. Let’s see how to convert this nested XML field to JSON in the subsequent steps.

Run a Spark job in Amazon EMR to transform the XML fields in the raw data to JSON

In this step, we use an Amazon EMR notebook, which is a managed environment to create and open Jupyter Notebook and JupyterLab interfaces. It enables you to interactively analyze and visualize data, collaborate with peers, and build applications using Apache Spark on EMR clusters. To open the notebook, follow these steps:

  1. On the Amazon S3 console, navigate to the bucket you created as a prerequisite step.
  2. Download the file in the notebooks folder.
  3. On the Amazon EMR console, choose Notebooks in the navigation pane.
  4. Choose Create notebook.
  5. For Notebook name, enter a name.
  6. For Cluster, select Choose an existing cluster.
  7. Select the cluster you created as a prerequisite.
  8. For Security Groups, choose BDB1909-EMR-LIVY-SG and BDB1909-EMR-Notebook-SG
  9. For AWS Service Role, choose the role bdb1909-emrNotebookRole-{xxx}.
  10. For Notebook location, specify the S3 path in the notebooks folder (s3://{stackname}-s3bucket-xxx}/notebooks/).
  11. Choose Create notebook.
  12. When the notebook is created, choose Open in JupyterLab.
  13. Upload the file you downloaded earlier.
  14. Open the new notebook.

    The notebook should look as shown in the following screenshot, and it contains a script written in Scala.
  15. Run the first two cells to configure Apache Spark with the open-source spark-xml library and import the needed modules.The spark-xml package allows reading XML files in local or distributed file systems as Spark DataFrames. Although primarily used to convert (portions of) large XML documents into a DataFrame, spark-xml can also parse XML in a string-valued column in an existing DataFrame with the from_xml function, in order to add it as a new column with parsed results as a struct.
  16. To do so, in the third cell, we load the data from the Parquet file generated by AWS DMS into a DataFrame, then we extract the attribute that contains the XML code (STUDY_MATERIAL_INFO) and map it to a string variable name payloadSchema.
  17. We can now use the payloadSchema in the from_xml function to convert the field STUDY_MATERIAL_INFO into a struct data type and added it as a column named course_material in a new DataFrame parsed.
  18. Finally, we can drop the original field and write the parsed DataFrame to our curated zone in Amazon S3.

Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. More details and documentation are available XML Data Source for Apache Spark.

When we convert from XML to DataFrame, attributes are converted as fields with the heading prefix attributePrefix (underscore (_) is the default). For example, see the following code:

  <book category="undergraduate">
    <title lang="en">Introduction to Biology</title>
    <author>Demo Author 1</author>
    <year>2005</year>
    <price>30.00</price>
  </book>

It produces the following schema:

root
 |-- category: string (nullable = true)
 |-- title: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _lang: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: string (nullable = true)
 |-- price: string (nullable = true)

Next, we have a value in an element that has no child elements but attributes. The value is put in a separate field, valueTag. See the following code:

<title lang="en">Introduction to Biology</title>

It produces the following schema, and the tag lang is converted into the _lang field inside the DataFrame:

|-- title: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _lang: string (nullable = true)

Copy curated data into Amazon Redshift and query tables seamlessly

Because our semi-structured nested dataset is already written in the S3 bucket as Apache Parquet formatted files, we can use the COPY command with the SERIALIZETOJSON option to ingest data into Amazon Redshift. The Amazon Redshift table structure should match the metadata of the Parquet files. Amazon Redshift can replace any Parquet columns, including structure and array types, with SUPER data columns.

The following code demonstrates CREATE TABLE example to create a staging table.

create table rs_xml_db.public.stg_edw_course_catalog 
(
course_id bigint,
course_name character varying(5000),
course_material super
);

The following code uses the COPY example to load from Parquet format:

COPY rs_xml_db.public.stg_edw_course_catalog FROM 's3://<<your Amazon S3 Bucket for curated data>>/data/target/<<your output parquet file>>' 
IAM_ROLE '<<your IAM role>>' 
FORMAT PARQUET SERIALIZETOJSON; 

By using semistructured data support in Amazon Redshift, you can ingest and store semistructured data in your Amazon Redshift data warehouses. With the SUPER data type and PartiQL language, Amazon Redshift expands the data warehouse capability to integrate with both SQL and NoSQL data sources. The SUPER data type only supports up to 1 MB of data for an individual SUPER field or object. Note, the JSON object may be stored in a SUPER data type, but reading this data using JSON functions currently has a VARCHAR (65535 byte) limit. See Limitations for more details.

The following example shows how nested JSON can be easily accessed using SELECT statements:

SELECT DISTINCT bk._category
	,bk.author
	,bk.price
	,bk.year
	,bk.title._lang
FROM rs_xml_db.public.stg_edw_course_catalog main
INNER JOIN main.course_material.book bk ON true;

The following screenshot shows our results.

Clean up

To avoid incurring future charges, first delete the notebook and the related files on Amazon S3 bucket as explained in this EMR documentation page then the CloudFormation stack.

Conclusion

This post demonstrated how to use AWS services like AWS DMS, Amazon S3, Amazon EMR, and Amazon Redshift to seamlessly work with complex data types like XML and perform historical migrations when building a cloud data lake house on AWS. We encourage you to try this solution and take advantage of all the benefits of these purpose-built services.

If you have questions or suggestions, please leave a comment.


About the authors

Abhilash Nagilla is a Sr. Specialist Solutions Architect at AWS, helping public sector customers on their cloud journey with a focus on AWS analytics services. Outside of work, Abhilash enjoys learning new technologies, watching movies, and visiting new places.

Avinash Makey is a Specialist Solutions Architect at AWS. He helps customers with data and analytics solutions in AWS. Outside of work he plays cricket, tennis and volleyball in free time.

Fabrizio Napolitano is a Senior Specialist SA for DB and Analytics. He has worked in the analytics space for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Removing complexity to improve business performance: How Bridgewater Associates built a scalable, secure, Spark-based research service on AWS

Post Syndicated from Sergei Dubinin original https://aws.amazon.com/blogs/big-data/removing-complexity-to-improve-business-performance-how-bridgewater-associates-built-a-scalable-secure-spark-based-research-service-on-aws/

This is a guest post co-written by Sergei Dubinin, Oleksandr Ierenkov, Illia Popov and Joel Thompson, from Bridgewater.

Bridgewater’s core mission is to understand how the world works by analyzing the drivers of markets and turning that understanding into high-quality portfolios and investment advice for our clients. Within Bridgewater Technology, we strive to make our researchers as productive as possible at what they do best: building the fundamental understanding of global markets. This means eliminating the need to deal with underlying IT infrastructure, and focusing on building and improving their investment ideas.

In this post, we examine our proprietary service in four dimensions. We talk about our business challenges, how we met our high security bar, how we can scale to meet the demands of the business, and how we do all of this in a cost-effective manner.

Challenge

Our researchers’ demand for compute required to develop and test their investment logic is constantly growing. This consistent and aggressive growth in compute capacity was a driving force behind our initial decision to move to the public cloud.

Utilizing the scale of the AWS Cloud has allowed us to generate investment signals and views of the world that would have been impossible to do on premises. When we first moved this analytical workload to AWS, we built on Amazon Elastic Compute Cloud (Amazon EC2) along with other services such as Elastic Load Balancing, AWS Auto Scaling, and Amazon Simple Storage Service (Amazon S3) to provide core functionality. A short time later, we moved to the AWS Nitro System, completing jobs 20% faster—allowing our research teams to iterate more quickly on their investment ideas.

The next step in our evolution started 2 years ago when we adopted Apache Spark as the underlying compute engine for our investment logic execution service. This helped streamline our analytics pipeline, removing duplication and decoupling many of the plugins we were developing for our researchers. Rather than run Apache Spark ourselves, we chose Amazon EMR as a hosted Spark platform. However, we soon discovered that Amazon EMR on EC2 wasn’t a good fit for the way we wanted to use it. For example, we can’t predict when a researcher will submit a job, so to avoid having our researchers wait for a brand new EMR cluster to be created and bootstrapped, we used long-lived EMR clusters, which forced many different jobs to run on the same cluster. However, because a single EMR cluster can only exist in a single Availability Zone, our cluster was limited to only being able to launch instances in that Availability Zone. At the significant scale that we were operating at, individual Availability Zones started running out of our desired instance capacity to meet our needs. Although we could launch many different clusters across different Availability Zones, that would leave us handling job scheduling at a high level, which was the whole point of using Amazon EMR and Spark. Furthermore, to be as cost-efficient as possible, we wanted to continuously scale the number of nodes in the cluster based on demand, and as a result, we would churn through thousands of nodes a day. This constant churning of nodes caused job failures and additional operational overhead for our teams.

We brought these concerns to AWS, who took the lead in pushing these issues to resolution. AWS partnered closely with us to understand our use cases and the impact of job failures, and tirelessly worked with us to solve these challenges. Working with the Amazon EMR team, we narrowed down the problem to our aggressive scaling patterns, which the service could not handle at that time. Over the course of just a few months, the Amazon EMR team made several service improvements in the scaling mechanism to meet our needs and the needs of many other AWS customers.

While working closely with the Amazon EMR team on these issues, the AWS team informed us of the development of Amazon EMR on EKS, a managed service that would enable us to run Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS). Amazon EKS is a strategic platform for us across various business units at Bridgewater, and after doing a proof of concept of our workload using EMR on EKS, it became clear that this was a better fit for our use case and more aligned with our strategic direction. After migrating to EMR on EKS, we can now take advantage of capacity in multiple Availability Zones and improve our resiliency to EMR cluster issues or broader service events, while still meeting our high security bar.

Security

Another important aspect of our service is ensuring it maintains the appropriate security posture. Among other concerns, Bridgewater strictly compartmentalizes access to different investment ideas, and we must defend against the possibility of a malicious insider attempting to steal our intellectual property or otherwise harm Bridgewater. To balance the trade-offs between speed and security, we designed security controls to defend against potentially malicious jobs, while enabling our researchers to quickly iterate on their code. This is made more complicated by the design of Spark’s Kubernetes backend. The Spark driver, which in our case is running arbitrary and untrusted code, has to be given Kubernetes role-based access control (RBAC) permissions to create Kubernetes Pods. The ability to create Pods is very powerful and can lead to privilege escalation.

Our first layer of isolation is to run each job in its own Kubernetes namespace (and, therefore, in its own EMR on EKS virtual cluster). A namespace and virtual cluster are created when the job is ready to be submitted, and they’re deleted when that job is finished. This prevents one job from interfering directly with another job, but there are still other vectors to defend against. For example, Spark drivers should not be creating Pods with containers that run as root or source their images from unapproved repositories. We first investigated PodSecurityPolicies for this purpose. However, they couldn’t solve all of our use cases (such as restricting where container images can be pulled from), and they are currently being deprecated and will eventually be removed. Instead, we turned to Open Policy Agent (OPA) Gatekeeper, which provides a flexible approach for writing policies in code that can do more complex authorization decisions and allows us to implement our desired suite of controls. We also worked with the AWS Service Team to add further defense in depth, such as ensuring that all Pods created by EMR on EKS dropped all Linux capabilities, which we could then enforce with Gatekeeper.

The following diagram illustrates how we can maintain the required job separation within our research service.

Scaling

One of the largest motivations of our evolution to Spark on Amazon EMR and then on EMR on EKS was improving the efficiency of our resource utilization by aggressively scaling based on demand. Our fundamental cause-and-effect understanding of markets and economies is powered by our systematic, high-performance compute Spark grid. We run simulations at a constantly increasing scale and need an architecture that can scale up and meet our foreseeable business needs for the next several years.

Our platform runs two types of jobs: ad hoc interactive and scheduled batch. Each type of job brings its own scaling complexities, and both benefited from the evolution to EMR on EKS. Ad hoc jobs can be submitted at any time throughout business hours, and the simulation determines how much compute capacity is needed. For example, a particular job may need one EC2 instance or 100 EC2 instances. This can translate to hundreds of EC2 instances needing to be spun up or down within a few minutes. The scheduled batch jobs run periodically throughout the day with predetermined simulations and similarly translates to hundreds of EC2 instances spinning up or down. In total, scaling up and down by many hundreds of EC2 instances in a few minutes is common, and we needed a solution that could meet those business requirements.

For this specific problem, we needed a solution that was able to handle aggressive scaling events on the order of hundreds of EC2 instances per minute. Additionally, when operating at this scale, it’s important to both diversify instance types and spread jobs across Availability Zones. EMR on EKS empowers us to run fully-managed Spark jobs on an EKS cluster that spans multiple Availability Zones and provides the option to choose a heterogeneous set of instance types for Amazon EKS. Spanning a single EKS cluster across Availability Zones enables us to utilize compute capacity across the entire Region, thereby increasing instance diversity and availability for this workload. Because Spark jobs are running within containers on Amazon EKS, we can easily swap out instance types within the EKS cluster or run different instance types within the same cluster. As a result of these capabilities, we’re able to regularly scale our production service to approximately 1,600 EC2 instances totaling 25,000 cores at peak, running 3,000 jobs per day.

Finally, in late 2021, we conducted some scaling tests to see what the realistic limits of our service are. We are happy to share that we were able to scale our service to three times our normal daily size in terms of compute and simulations run. This exercise has validated that we will be able to meet the increase in business demand without committing additional engineering resources to do so.

Cost management

In addition to significantly increasing our ability to scale, we also were able to design the solution to be extremely cost effective. Prior to EMR on EKS, we had two options for Spark jobs: either self-managed on Amazon EC2 or using Amazon EMR on EC2. Self-managing on Amazon EC2 meant that we needed to manage the complexities of scheduling jobs on nodes, manage the Spark clusters themselves, and develop a separate application to provision and stop EC2 instances as Spark jobs ran to scale the workloads. Amazon EMR on EC2 provides a managed service to run Spark workloads on Amazon EC2. However, for customers like us who need to operate in multiple Availability Zones and already have a technology footprint on Kubernetes, EMR on EKS made more sense.

Moving to EMR on EKS enables us to scale dynamically as jobs are submitted, generating huge cost savings. Simulation capacity is right-sized within the range of a few minutes; something that is not possible with another solution. Additionally, our investment in Amazon EC2 Compute Savings Plans provides us with the savings and flexibility to meet our needs; we just need to specify how many compute hours we’re committed to in a particular Region and AWS handles the rest. You can read more about the cost benefits of EMR on EKS in Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads.

The future

Although we’re currently meeting our key users’ needs, we have prioritized several improvements to our service for the future. First, we plan on replacing the Kubernetes Cluster Autoscaler with Karpenter. Given our aggressive and frequent compute scaling, we have found that some jobs can be unexpectedly stopped using the Cluster Autoscaler. We experience this about six times a day. We expect Karpenter will greatly diminish the occurrence of this failure mode. To learn more about Karpenter, check out Introducing Karpenter – An Open-Source High-Performance Kubernetes Cluster Autoscaler.

Second, we’re moving several complementary services that are currently running on EC2 to EKS. This will increase our ability to deploy meaningful improvements for our business and increase resiliency to service events.

Finally, we are making longer term efforts to improve our resiliency to regional service events. We are exploring broadening our operations to other AWS Regions, which would allow us to increase our service availability as well as maintain our burst capacity.

Conclusion

Working closely with AWS teams, we were able to develop a scalable, secure, and cost-optimized service on AWS that allows our researchers to generate larger and more complex investment ideas without worrying about IT infrastructure. Our service runs our Spark-based simulations across multiple Availability Zones at near-full utilization without having to worry about building or maintaining a scheduling platform. Finally, we are able to meet and surpass our security benchmarks by creating job separation using native AWS constructs at scale. This has given us tremendous confidence that our mission-critical data is safe in the AWS Cloud.

Through this close partnership with AWS, Bridgewater is poised to anticipate and meet the rigorous demands of our researchers for years to come; something that was not possible in our old data centers or with our prior architecture. Our President and CTO, Igor Tsyganskiy, recently spoke with AWS at length on this partnership. For the video of this discussion, check out Merging Business and Tech – Bridgewater’s Guide to Drive Agility.

Acknowledgements

  • Igor Tsyganskiy, President and Chief Technology Officer, Bridgewater
  • Aaron Linsky, Sr. Product Manager, Bridgewater
  • Gopinathan Kannan, Sr. Mgr. Engineering, Amazon Web Services
  • Vaibhav Sabharwal, Sr. Customer Solutions Manager, Amazon Web Services
  • Joseph Marques, Senior Principal Engineer, Amazon Web Services
  • David Brown, VP EC2, Amazon Web Services

About the authors

Sergei Dubinin is an Engineering Manager with Bridgewater. He is passionate about building big data processing systems that are suitable for a secure, stable, and performant use in production.

Oleksandr Ierenkov is a Solution Architect for EPAM Systems. He has focused on helping Bridgewater migrate in-house distributed systems to microservices on Kubernetes and various AWS-managed services with a focus on operational efficiency. Oleksandr is basically the same name as Alexander, only Ukrainian.

Anthony Pasquariello is a Senior Solutions Architect at AWS based in New York City. He specializes in modernization and security for our advanced enterprise customers. Anthony enjoys writing and speaking about all things cloud. He’s pursuing an MBA, and received his MS and BS in Electrical & Computer Engineering.

Illia Popov is a Tech Lead for EPAM Systems. Illia has been working with Bridgewater since 2018 and was active in planning and implementing the migration to EMR on EKS. He is excited to continue delivering value to Bridgewater by adapting managed services in close cooperation with AWS.

Peter Sideris is a Sr. Technical Account Manager at AWS. He works with some of our largest and most complex customers to ensure their success in the AWS Cloud. Peter enjoys his family, marine reef keeping, and volunteers his time to the Boy Scouts of America in several capacities.

Joel Thompson is an Architect at Bridgewater Associates, where he has worked in a variety of technology roles over the past 13 years, including building some of the earliest foundations of AWS adoption at Bridgewater. He is passionate about solving complicated problems to securely deliver value to the business. Outside of work, Joel is an avid skier, helped co-found the fwd:cloudsec cloud security conference, and enjoys traveling to spend time with friends and family.

Set up federated access to Amazon Athena for Microsoft AD FS users using AWS Lake Formation and a JDBC client

Post Syndicated from Mostafa Safipour original https://aws.amazon.com/blogs/big-data/set-up-federated-access-to-amazon-athena-for-microsoft-ad-fs-users-using-aws-lake-formation-and-a-jdbc-client/

Tens of thousands of AWS customers choose Amazon Simple Storage Service (Amazon S3) as their data lake to run big data analytics, interactive queries, high-performance computing, and artificial intelligence (AI) and machine learning (ML) applications to gain business insights from their data. On top of these data lakes, you can use AWS Lake Formation to ingest, clean, catalog, transform, and help secure your data and make it available for analysis and ML. Once you have setup your data lake, you can use Amazon Athena which is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL.

With Lake Formation, you can configure and manage fine-grained access control to new or existing databases, tables, and columns defined in the AWS Glue Data Catalog for data stored in Amazon S3. After you set access permissions using Lake Formation, you can use analytics services such as Amazon Athena, Amazon Redshift, and Amazon EMR without needing to configure policies for each service.

Many of our customers use Microsoft Active Directory Federation Services (AD FS) as their identity provider (IdP) while using cloud-based services. In this post, we provide a step-by-step walkthrough of configuring AD FS as the IdP for SAML-based authentication with Athena to query data stored in Amazon S3, with access permissions defined using Lake Formation. This enables end-users to log in to their SQL client using Active Directory credentials and access data with fine-grained access permissions.

Solution overview

To build the solution, we start by establishing trust between AD FS and your AWS account. With this trust in place, AD users can federate into AWS using their AD credentials and assume permissions of an AWS Identity and Access Management (IAM) role to access AWS resources such as the Athena API.

To create this trust, you add AD FS as a SAML provider into your AWS account and create an IAM role that federated users can assume. On the AD FS side, you add AWS as a relying party and write SAML claim rules to send the right user attributes to AWS (specifically Lake Formation) for authorization purposes.

The steps in this post are structured into the following sections:

  1. Set up an IAM SAML provider and role.
  2. Configure AD FS.
  3. Create Active Directory users and groups.
  4. Create a database and tables in the data lake.
  5. Set up the Lake Formation permission model.
  6. Set up a SQL client with JDBC connection.
  7. Verify access permissions.

The following diagram provides an overview of the solution architecture.

The flow for the federated authentication process is as follows:

  1. The SQL client which has been configured with Active Directory credentials sends an authentication request to AD FS.
  2. AD FS authenticates the user using Active Directory credentials, and returns a SAML assertion.
  3. The client makes a call to Lake Formation, which initiates an internal call with AWS Security Token Service (AWS STS) to assume a role with SAML for the client.
  4. Lake Formation returns temporary AWS credentials with permissions of the defined IAM role to the client.
  5. The client uses the temporary AWS credentials to call the Athena API StartQueryExecution.
  6. Athena retrieves the table and associated metadata from the AWS Glue Data Catalog.
  7. On behalf of the user, Athena requests access to the data from Lake Formation (GetDataAccess). Lake Formation assumes the IAM role associated with the data lake location and returns temporary credentials.
  8. Athena uses the temporary credentials to retrieve data objects from Amazon S3.
  9. Athena returns the results to the client based on the defined access permissions.

For our use case, we use two sample tables:

  • LINEORDER – A fact table containing orders
  • CUSTOMER – A dimension table containing customer information including Personally Identifiable Information (PII) columns (c_name, c_phone, c_address)

We also have data consumer users who are members of the following teams:

  • CustomerOps – Can see both orders and customer information, including PII attributes of the customer
  • Finance – Can see orders for analytics and aggregation purposes but only non-PII attributes of the customer

To demonstrate this use case, we create two users called CustomerOpsUser and FinanceUser and three AD groups for different access patterns: data-customer (customer information access excluding PII attributes), data-customer-pii (full customer information access including PII attributes), and data-order (order information access). By adding the users to these three groups, we can grant the right level of access to different tables and columns.

Prerequisites

To follow along with this walkthrough, you must meet the following prerequisites:

Set up an IAM SAML provider and role

To set up your SAML provider, complete the following steps:

  1. In the IAM console, choose Identity providers in the navigation pane.
  2. Choose Add provider.
  3. For Provider Type, choose SAML.
  4. For Provider Name, enter adfs-saml-provider.
  5. For Metadata Document, download your AD FS server’s federation XML file by entering the following address in a browser with access to the AD FS server:
    https://<adfs-server-name>/FederationMetadata/2007-06/FederationMetadata.xml

  6. Upload the file to AWS by choosing Choose file.
  7. Choose Add provider to finish.

Now you’re ready to create a new IAM role.

  1. In the navigation pane, choose Roles.
  2. Choose Create role.
  3. For the type of trusted entity, choose SAML 2.0 federation.
  4. For SAML provider, choose the provider you created (adfs-saml-provider).
  5. Choose Allow programmatic and AWS Management Console access.
  6. The Attribute and Value fields should automatically populate with SAML:aud and https://signin.aws.amazon.com/saml.
  7. Choose Next:Permissions.
  8. Add the necessary IAM permissions to this role. For this post, attach AthenaFullAccess.

If the Amazon S3 location for your Athena query results doesn’t start with aws-athena-query-results, add another policy to allow users write query results into your Amazon S3 location. For more information, see Specifying a Query Result Location Using the Athena Console and Writing IAM Policies: How to Grant Access to an Amazon S3 Bucket.

  1. Leave the defaults in the next steps and for Role name, enter adfs-data-access.
  2. Choose Create role.
  3. Take note of the SAML provider and IAM role names to use in later steps when creating the trust between the AWS account and AD FS.

Configure AD FS

SAML-based federation has two participant parties: the IdP (Active Directory) and the relying party (AWS), which is the service or application that wants to use authentication from the IdP.

To configure AD FS, you first add a relying party trust, then you configure SAML claim rules for the relying party. Claim rules are the way that AD FS forms a SAML assertion sent to a relying party. The SAML assertion states that the information about the AD user is true, and that it has authenticated the user.

Add a relying party trust

To create your relying party in AD FS, complete the following steps:

  1. Log in to the AD FS server.
  2. On the Start menu, open ServerManger.
  3. On the Tools menu, choose the AD FS Management console.
  4. Under Trust Relationships in the navigation pane, choose Relying Party Trusts.
  5. Choose Add Relying Party Trust.
  6. Choose Start.
  7. Select Import data about the relying party published online or on a local network and enter the URL https://signin.aws.amazon.com/static/saml-metadata.xml.

The metadata XML file is a standard SAML metadata document that describes AWS as a relying party.

  1. Choose Next.
  2. For Display name, enter a name for your relying party.
  3. Choose Next.
  4. Select I do not want to configure multi-factor authentication.

For increased security, we recommend that you configure multi-factor authentication to help protect your AWS resources. We don’t enable multi-factor authentication for this post because we’re using a sample dataset.

  1. Choose Next.
  2. Select Permit all users to access this relying party and choose Next.

This allows all users in Active Directory to use AD FS with AWS as a relying party. You should consider your security requirements and adjust this configuration accordingly.

  1. Finish creating your relying party.

Configure SAML claim rules for the relying party

You create two sets of claim rules in this post. The first set (rules 1–4) contains AD FS claim rules that are required to assume an IAM role based on AD group membership. These are the rules that you also create if you want to establish federated access to the AWS Management Console. The second set (rules 5–6) are claim rules that are required for Lake Formation fine-grained access control.

To create AD FS claim rules, complete the following steps:

  1. On the AD FS Management console, find the relying party you created in the previous step.
  2. Right-click the relying party and choose Edit Claim Rules.
  3. Choose Add Rule and create your six new rules.
  4. Create claim rule 1, called NameID:
    1. For Rule template, use Transform an Incoming Claim.
    2. For Incoming claim type, choose Windows account name.
    3. For Outgoing claim type, choose Name ID.
    4. For Outgoing name ID format, choose Persistent Identifier.
    5. Select Pass through all claim values.
  5. Create claim rule 2, called RoleSessionName:
    1. For Rule template, use Send LDAP Attribute as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute E-Mail-Addresses and outgoing claim type https://aws.amazon.com/SAML/Attributes/RoleSessionName.
  6. Create claim rule 3, called Get AD Groups:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname", Issuer == "AD AUTHORITY"]
      => add(store = "Active Directory", types = ("http://temp/variable"), query = ";tokenGroups;{0}", param = c.Value);

  7. Create claim rule 4, called Roles:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code (enter your account number and name of the SAML provider you created earlier):
      c:[Type == "http://temp/variable", Value =~ "(?i)^aws-"]
      => issue(Type = "https://aws.amazon.com/SAML/Attributes/Role", Value = RegExReplace(c.Value, "aws-", "arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/<adfs-saml-provider>,arn:aws:iam::<AWS ACCOUNT NUMBER>:role/"));

Claim rules 5 and 6 allow Lake Formation to make authorization decisions based on user name or the AD group membership of the user.

  1. Create claim rule 5, called LF-UserName, which passes the user name and SAML assertion to Lake Formation:
    1. For Rule template, use Send LDAP Attributes as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute User-Principal-Name and outgoing claim type https://lakeformation.amazon.com/SAML/Attributes/Username.
  2. Create claim rule 6, called LF-Groups, which passes data and analytics-related AD groups that the user is a member of, along with the SAML assertion to Lake Formation:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://temp/variable", Value =~ "(?i)^data-"]
      => issue(Type = "https://lakeformation.amazon.com/SAML/Attributes/Groups", Value = c.Value);

The preceding rule snippet filters AD group names starting with data-. This is an arbitrary naming convention; you can adopt your preferred naming convention for AD groups that are related to data lake access.

Create Active Directory users and groups

In this section, we create two AD users and required AD groups to demonstrate varying levels of access to the data.

Create users

You create two AD users: FinanceUser and CustomerOpsUser. Each user corresponds to an individual who is a member of the Finance or Customer business units. The following table summarizes the details of each user.

 

FinanceUser CustomerOpsUser
First Name FinanceUser CustomerOpsUser
User logon name [email protected] [email protected]
Email [email protected] [email protected]

To create your users, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. In the navigation pane, choose Users.
  3. On the tool bar, choose the Create user icon.
  4. For First name, enter FinanceUser.
  5. For Full name, enter FinanceUser.
  6. For User logon name, enter [email protected].
  7. Choose Next.
  8. Enter a password and deselect User must change password at next logon.

We choose this option for simplicity, but in real-world scenarios, newly created users must change their password for security reasons.

  1. Choose Next.
  2. In Active Directory Users and Computers, choose the user name.
  3. For Email, enter [email protected].

Adding an email is mandatory because it’s used as the RoleSessionName value in the SAML assertion.

  1. Choose OK.
  2. Repeat these steps to create CustomerOpsUser.

Create AD groups to represent data access patterns

Create the following AD groups to represent three different access patterns and also the ability to assume an IAM role:

  • data-customer – Members have access to non-PII columns of the customer table
  • data-customer-pii – Members have access to all columns of the customer table, including PII columns
  • data-order – Members have access to the lineorder table
  • aws-adfs-data-access – Members assume the adfs-data-access IAM role when logging in to AWS

To create the groups, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. On the tool bar, choose the Create new group icon.
  3. For Group name¸ enter data-customer.
  4. For Group scope, select Global.
  5. For Group type¸ select Security.
  6. Choose OK.
  7. Repeat these steps to create the remaining groups.

Add users to appropriate groups

Now you add your newly created users to their appropriate groups, as detailed in the following table.

User Group Membership Description
CustomerOpsUser data-customer-pii
data-order
aws-adfs-data-access
Sees all customer information including PII and their orders
FinanceUser data-customer
data-order
aws-adfs-data-access
Sees only non-PII customer data and orders

Complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. Choose the user FinanceUser.
  3. On the Member Of tab, choose Add.
  4. Add the appropriate groups.
  5. Repeat these steps for CustomerOpsUser.

Create a database and tables in the data lake

In this step, you copy data files to an S3 bucket in your AWS account by running the following AWS Command Line Interface (AWS CLI) commands. For more information on how to set up the AWS CLI, refer to Configuration Basics.

These commands copy the files that contain data for customer and lineorder tables. Replace <BUCKET NAME> with the name of an S3 bucket in your AWS account.

aws s3 sync s3://awssampledb/load/ s3://<BUCKET NAME>/customer/ \
--exclude "*" --include "customer-fw.tbl-00*" --exclude "*.bak"

aws s3api copy-object --copy-source awssampledb/load/lo/lineorder-single.tbl000.gz \
--key lineorder/lineorder-single.tbl000.gz --bucket <BUCKET NAME> \
--tagging-directive REPLACE

For this post, we use the default settings for storing data and logging access requests within Amazon S3. You can enhance the security of your sensitive data with the following methods:

  • Implement encryption at rest using AWS Key Management Service (AWS KMS) and customer managed encryption keys
  • Use AWS CloudTrail and audit logging
  • Restrict access to AWS resources based on the least privilege principle

Additionally, Lake Formation is integrated with CloudTrail, a service that provides a record of actions taken by a user, role, or AWS service in Lake Formation. CloudTrail captures all Lake Formation API calls as events and is enabled by default when you create a new AWS account. When activity occurs in Lake Formation, that activity is recorded as a CloudTrail event along with other AWS service events in event history. For audit and access monitoring purposes, all federated user logins are logged via CloudTrail under the AssumeRoleWithSAML event name. You can also view specific user activity based on their user name in CloudTrail.

To create a database and tables in the Data Catalog, open the query editor on the Athena console and enter the following DDL statements. Replace <BUCKET NAME> with the name of the S3 bucket in your account.

CREATE DATABASE salesdata;
CREATE EXTERNAL TABLE salesdata.customer
(
    c_custkey VARCHAR(10),
    c_name VARCHAR(25),
    c_address VARCHAR(25),
    c_city VARCHAR(10),
    c_nation VARCHAR(15),
    c_region VARCHAR(12),
    c_phone VARCHAR(15),
    c_mktsegment VARCHAR(10)
)
-- The data files contain fixed width columns hence using RegExSerDe
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "(.{10})(.{25})(.{25})(.{10})(.{15})(.{12})(.{15})(.{10})"
)
LOCATION 's3://<BUCKET NAME>/customer/';

CREATE EXTERNAL TABLE salesdata.lineorder(
  `lo_orderkey` int, 
  `lo_linenumber` int, 
  `lo_custkey` int, 
  `lo_partkey` int, 
  `lo_suppkey` int, 
  `lo_orderdate` int, 
  `lo_orderpriority` varchar(15), 
  `lo_shippriority` varchar(1), 
  `lo_quantity` int, 
  `lo_extendedprice` int, 
  `lo_ordertotalprice` int, 
  `lo_discount` int, 
  `lo_revenue` int, 
  `lo_supplycost` int, 
  `lo_tax` int, 
  `lo_commitdate` int, 
  `lo_shipmode` varchar(10))
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
LOCATION 's3://<BUCKET NAME>/lineorder/';

Verify that tables are created and you can see the data:

SELECT * FROM "salesdata"."customer" limit 10;
SELECT * FROM "salesdata"."lineorder" limit 10;

Set up the Lake Formation permission model

Lake Formation uses a combination of Lake Formation permissions and IAM permissions to achieve fine-grained access control. The recommended approach includes the following:

  • Coarse-grained IAM permissions – These apply to the IAM role that users assume when running queries in Athena. IAM permissions control access to Lake Formation, AWS Glue, and Athena APIs.
  • Fine-grained Lake Formation grants – These control access to Data Catalog resources, Amazon S3 locations, and the underlying data at those locations. With these grants, you can give access to specific tables or only columns that contain specific data values.

Configure IAM role permissions

Earlier in the walkthrough, you created the IAM role adfs-data-access and attached the AWS managed IAM policy AthenaFullAccess to it. This policy has all the permissions required for the purposes of this post.

For more information, see the Data Analyst Permissions section in Lake Formation Personas and IAM Permissions Reference.

Register an S3 bucket as a data lake location

The mechanism to govern access to an Amazon S3 location using Lake Formation is to register a data lake location. Complete the following steps:

  1. On the Lake Formation console, choose Data lake locations.
  2. Choose Register location.
  3. For Amazon S3 path, choose Browse and locate your bucket.
  4. For IAM role, choose AWSServiceRoleForLakeFormationDataAccess.

In this step, you specify an IAM service-linked role, which Lake Formation assumes when it grants temporary credentials to integrated AWS services that access the data in this location. This role and its permissions are managed by Lake Formation and can’t be changed by IAM principals.

  1. Choose Register location.

Configure data permissions

Now that you have registered the Amazon S3 path, you can give AD groups appropriate permissions to access tables and columns in the salesdata database. The following table summarizes the new permissions.

Database and Table AD Group Name Table Permissions Data Permissions
salesdata.customer data-customer Select c_city, c_custkey, c_mktsegment, c_nation, and c_region
salesdata.customer data-customer-pii Select All data access
salesdata.lineorder data-order Select All data access
  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Filter tables by the salesdata database.
  3. Select the customer table and on the Actions menu, choose View permissions.

You should see following existing permissions. These entries allow the current data lake administrator to access the table and all its columns.

  1. To add new permissions, select the table and on the Actions menu, choose Grant.
  2. Select SAML user and groups.
  3. For SAML and Amazon QuickSight users and groups, enter arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/adfs-saml-provider:group/data-customer.

To get this value, get the ARN of the SAML provider from the IAM console and append :group/data-customer to the end of it.

  1. Select Named data catalog resources.
  2. For Databases, choose the salesdata database.
  3. For Tables, choose the customer table.
  4. For Table permissions, select Select.
  5. For Data permissions, select Column-based access.
  6. For Select columns, add the columns c_city, c_custkey, c_mktsegment, c_nation, and c_region.
  7. Choose Grant.

You have now allowed members of the AD group data-customer to have access to columns of the customer table that don’t include PII.

  1. Repeat these steps for the customer table and data-customer-pii group with all data access.
  2. Repeat these steps for the lineorder table and data-order group with all data access.

Set up a SQL client with JDBC connection and verify permissions

In this post, we use SQL Workbench to access Athena through AD authentication and verify the Lake Formation permissions you created in the previous section.

Prepare the SQL client

To set up the SQL client, complete the following steps:

  1. Download and extract the Lake Formation-compatible Athena JDBC driver with AWS SDK (2.0.14 or later version) from Using Athena with the JDBC Driver.
  2. Go to the SQL Workbench/J website and download the latest stable package.
  3. Install SQL Workbench/J on your client computer.
  4. In SQL Workbench, on the File menu, choose Manage Drivers.
  5. Choose the New driver icon.
  6. For Name, enter Athena JDBC Driver.
  7. For Library, browse to and choose the Simba Athena JDBC .jar file that you just downloaded.
  8. Choose OK.

You’re now ready to create connections in SQL Workbench for your users.

Create connections in SQL Workbench

To create your connections, complete the following steps:

  1. On the File menu, choose Connect.
  2. Enter the name Athena-FinanceUser.
  3. For Driver, choose the Simba Athena JDBC driver.
  4. For URL, enter the following code (replace the placeholders with actual values from your setup and remove the line breaks to make a single line connection string):
jdbc:awsathena://AwsRegion=<AWS Region Name e.g. ap-southeast-2>;
S3OutputLocation=s3://<Athena Query Result Bucket Name>/jdbc;
plugin_name=com.simba.athena.iamsupport.plugin.AdfsCredentialsProvider;
idp_host=<adfs-server-name e.g. adfs.company.com>;
idp_port=443;
preferred_role=<ARN of the role created in step1 e.g. arn>;
[email protected]<Domain Name e.g. company.com>;
password=<password>;
SSL_Insecure=true;
LakeFormationEnabled=true;

For this post, we used a self-signed certificate with AD FS. This certificate is not trusted by the client, therefore authentication doesn’t succeed. This is why the SSL_Insecure attribute is set to true to allow authentication despite the self-signed certificate. In real-world setups, you would use valid trusted certificates and can remove the SSL_Insecure attribute.

  1. Create a new SQL workbench profile named Athena-CustomerOpsUser and repeat the earlier steps with CustomerOpsUser in the connection URL string.
  2. To test the connections, choose Test for each user, and confirm that the connection succeeds.

Verify access permissions

Now we can verify permissions for FinanceUser. In the SQL Workbench Statement window, run the following SQL SELECT statement:

SELECT * FROM "salesdata"."lineorder" limit 10;
SELECT * FROM "salesdata"."customer" limit 10;

Verify that only non-PII columns are returned from the customer table.

As you see in the preceding screenshots, FinanceUser only has access to non-PII columns of the customer table and full access to (all columns) of the lineorder table. This allows FinanceUser, for example, to run aggregate and summary queries based on market segment or location of customers without having access to their personal information.

Run a similar query for CustomerOpsUser. You should be able to see all columns, including columns containing PII, in the customer table.

Conclusion

This post demonstrated how to configure your data lake permissions using Lake Formation for AD users and groups. We configured AD FS 3.0 on your Active Directory and used it as an IdP to federate into AWS using SAML. This post also showed how you can integrate your Athena JDBC driver to AD FS and use your AD credentials directly to connect to Athena.

Integrating your Active Directory with the Athena JDBC driver gives you the flexibility to access Athena from business intelligence tools you’re already familiar with to analyze the data in your Amazon S3 data lake. This enables you to have a consistent central permission model that is managed through AD users and their group memberships.


About the Authors

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Configure Hadoop YARN CapacityScheduler on Amazon EMR on Amazon EC2 for multi-tenant heterogeneous workloads

Post Syndicated from Suvojit Dasgupta original https://aws.amazon.com/blogs/big-data/configure-hadoop-yarn-capacityscheduler-on-amazon-emr-on-amazon-ec2-for-multi-tenant-heterogeneous-workloads/

Apache Hadoop YARN (Yet Another Resource Negotiator) is a cluster resource manager responsible for assigning computational resources (CPU, memory, I/O), and scheduling and monitoring jobs submitted to a Hadoop cluster. This generic framework allows for effective management of cluster resources for distributed data processing frameworks, such as Apache Spark, Apache MapReduce, and Apache Hive. When supported by the framework, Amazon EMR by default uses Hadoop YARN. Please note that not all frameworks offered by Amazon EMR use Hadoop YARN, such as Trino/Presto and Apache HBase.

In this post, we discuss various components of Hadoop YARN, and understand how components interact with each other to allocate resources, schedule applications, and monitor applications. We dive deep into the specific configurations to customize Hadoop YARN’s CapacityScheduler to increase cluster efficiency by allocating resources in a timely and secure manner in a multi-tenant cluster. We take an opinionated look at the configurations for CapacityScheduler and configure them on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) to solve for the common resource allocation, resource contention, and job scheduling challenges in a multi-tenant cluster.

We dive deep into CapacityScheduler because Amazon EMR uses CapacityScheduler by default, and CapacityScheduler has benefits over other schedulers for running workloads with heterogeneous resource consumption.

Solution overview

Modern data platforms often run applications on Amazon EMR with the following characteristics:

  • Heterogeneous resource consumption patterns by jobs, such as computation-bound jobs, I/O-bound jobs, or memory-bound jobs
  • Multiple teams running jobs with an expectation to receive an agreed-upon share of cluster resources and complete jobs in a timely manner
  • Cluster admins often have to cater to one-time requests for running jobs without impacting scheduled jobs
  • Cluster admins want to ensure users are using their assigned capacity and not using others
  • Cluster admins want to utilize the resources efficiently and allocate all available resources to currently running jobs, but want to retain the ability to reclaim resources automatically should there be a claim for the agreed-upon cluster resources from other jobs

To illustrate these use cases, let’s consider the following scenario:

  • user1 and user2 don’t belong to any team and use cluster resources periodically on an ad hoc basis
  • A data platform and analytics program has two teams:
    • A data_engineering team, containing user3
    • A data_science team, containing user4
  • user5 and user6 (and many other users) sporadically use cluster resources to run jobs

Based on this scenario, the scheduler queue may look like the following diagram. Take note of the common configurations applied to all queues, the overrides, and the user/groups-to-queue mappings.

Capacity Scheduler Queue Setup

In the subsequent sections, we will understand the high-level components of Hadoop YARN, discuss the various types of schedulers available in Hadoop YARN, review the core concepts of CapacityScheduler, and showcase how to implement this CapacityScheduler queue setup on Amazon EMR (on Amazon EC2). You can skip to Code walkthrough section if you are already familiar with Hadoop YARN and CapacityScheduler.

Overview of Hadoop YARN

At a high level, Hadoop YARN consists of three main components:

  • ResourceManager (one per primary node)
  • ApplicationMaster (one per application)
  • NodeManager (one per node)

The following diagram shows the main components and their interaction with each other.

Apache Hadoop Yarn Architecture Diagram1

Before diving further, let’s clarify what Hadoop YARN’s ResourceContainer (or container) is. A ResourceContainer represents a collection of physical computational resources. It’s an abstraction used to bundle resources into distinct, allocatable unit.

ResourceManager

The ResourceManager is responsible for resource management and making allocation decisions. It’s the ResourceManager’s responsibility to identify and allocate resources to a job upon submission to Hadoop YARN. The ResourceManager has two main components:

  • ApplicationsManager (not to be confused with ApplicationMaster)
  • Scheduler

ApplicationsManager

The ApplicationsManager is responsible for accepting job submissions, negotiating the first container for running ApplicationMaster, and providing the service for restarting the ApplicationMaster on failure.

Scheduler

The Scheduler is responsible for scheduling allocation of resources to the jobs. The Scheduler performs its scheduling function based on the resource requirements of the jobs. The Scheduler is a pluggable interface. Hadoop YARN currently provides three implementations:

  • CapacityScheduler – A pluggable scheduler for Hadoop that allows for multiple tenants to securely share a cluster such that jobs are allocated resources in a timely manner under constraints of allocated capacities. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler. In this post, we primarily focus on CapacityScheduler, which is the default scheduler on Amazon EMR (on Amazon EC2).
  • FairScheduler – A pluggable scheduler for Hadoop that allows Hadoop YARN applications to share resources in clusters fairly. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.
  • FifoScheduler – A pluggable scheduler for Hadoop that allows Hadoop YARN applications share resources in clusters in a first-in-first-out basis. The implementation is available on GitHub. The Java concrete class is org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler.

ApplicationMaster

Upon negotiating the first container by ApplicationsManager, the per-application ApplicationMaster has the responsibility of negotiating the rest of the appropriate resources from the Scheduler, tracking their status, and monitoring progress.

NodeManager

The NodeManager is responsible for launching and managing containers on a node.

Hadoop YARN on Amazon EMR

By default, Amazon EMR (on Amazon EC2) uses Hadoop YARN for cluster management for the distributed data processing frameworks that support Hadoop YARN as a resource manager, like Apache Spark, Apache MapReduce, and Apache Hive. Amazon EMR provides multiple sensible default settings that work for most scenarios. However, every data platform is different and has specific needs. Amazon EMR provides the ability to customize the setting at cluster creation using configuration classifications . You can also reconfigure Amazon EMR cluster applications and specify additional configuration classifications for each instance group in a running cluster using AWS Command Line Interface (AWS CLI), or the AWS SDK.

CapacityScheduler

CapacityScheduler depends on ResourceCalculator to identify the available resources and calculate the allocation of the resources to ApplicationMaster. The ResourceCalculator is an abstract Java class. Hadoop YARN currently provides two implementations:

  • DefaultResourceCalculator – In DefaultResourceCalculator, resources are calculated based on memory alone.
  • DominantResourceCalculatorDominantResourceCalculator is based on the Dominant Resource Fairness (DRF) model of resource allocation. The paper Dominant Resource Fairness: Fair Allocation of Multiple Resource Types, Ghodsi et al. [2011] describes DRF as follows: “DRF computes the share of each resource allocated to that user. The maximum among all shares of a user is called that user’s dominant share, and the resource corresponding to the dominant share is called the dominant resource. Different users may have different dominant resources. For example, the dominant resource of a user running a computation-bound job is CPU, while the dominant resource of a user running an I/O-bound job is bandwidth. DRF simply applies max-min fairness across users’ dominant shares. That is, DRF seeks to maximize the smallest dominant share in the system, then the second-smallest, and so on.”

Because of DRF, DominantResourceCalculator is a better ResourceCalculator for data processing environments running heterogeneous workloads. By default, Amazon EMR uses DefaultResourceCalculator for CapacityScheduler. This can be verified by checking the value of yarn.scheduler.capacity.resource-calculator parameter in /etc/hadoop/conf/capacity-scheduler.xml.

Code walkthrough

CapacityScheduler provides multiple parameters to customize the scheduling behavior to meet specific needs. For a list of available parameters, refer to Hadoop: CapacityScheduler.

Refer to the configurations section in cloudformation/templates/emr.yaml to review all the CapacityScheduler parameters set as part of this post. In this example, we use two classifiers of Amazon EMR (on Amazon EC2):

  • yarn-site – The classification to update yarn-site.xml
  • capacity-scheduler – The classification to update capacity-scheduler.xml

For various types of classification available in Amazon EMR, refer to Customizing cluster and application configuration with earlier AMI versions of Amazon EMR.

In the AWS CloudFormation template, we have modified the ResourceCalculator of CapacityScheduler from the defaults, DefaultResourceCalculator to DominantResourceCalculator. Data processing environments tends to run different kinds of jobs, for example, computation-bound jobs consuming heavy CPU, I/O-bound jobs consuming heavy bandwidth, and memory-bound jobs consuming heavy memory. As previously stated, DominantResourceCalculator is better suited for such environments due to its Dominant Resource Fairness model of resource allocation. If your data processing environment only runs memory-bound jobs, then modifying this parameter isn’t necessary.

You can find the codebase in the AWS Samples GitHub repository.

Prerequisites

For deploying the solution, you should have the following prerequisites:

Deploy the solution

To deploy the solution, complete the following steps:

  • Download the source code from the AWS Samples GitHub repository:
    git clone [email protected]:aws-samples/amazon-emr-yarn-capacity-scheduler.git

  • Create an Amazon Simple Storage Service (Amazon S3) bucket:
    aws s3api create-bucket --bucket emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION> --region <AWS_REGION>

  • Copy the cloned repository to the Amazon S3 bucket:
    aws s3 cp --recursive amazon-emr-yarn-capacity-scheduler s3://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>/

    1. ArtifactsS3Repository – The S3 bucket name that was created in the previous step (emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>).
    2. emrKeyName – An existing EC2 key name. If you don’t have an existing key and want to create a new key, refer to Use an Amazon EC2 key pair for SSH credentials.
    3. clientCIDR – The CIDR range of the client machine for accessing the EMR cluster via SSH. You can run the following command to identify the IP of the client machine: echo "$(curl -s http://checkip.amazonaws.com)/32"
  • Deploy the AWS CloudFormation templates:
    aws cloudformation create-stack \
    --stack-name emr-yarn-capacity-scheduler \
    --template-url https://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION>.s3.amazonaws.com/cloudformation/templates/main.yaml \
    --parameters file://amazon-emr-yarn-capacity-scheduler/cloudformation/parameters/parameters.json \
    --capabilities CAPABILITY_NAMED_IAM \
    --region <AWS_REGION>

  • On the AWS CloudFormation console, check for the successful deployment of the following stacks.

AWS CloudFormation Stack Deployment

  • On the Amazon EMR console, check for the successful creation of the emr-cluster-capacity-scheduler cluster.
  • Choose the cluster and on the Configurations tab, review the properties under the capacity-scheduler and yarn-site classification labels.

AWS EMR Configurations

  • Access the Hadoop YARN resource manager UI on the emr-cluster-capacity-scheduler cluster to review the CapacityScheduler setup. For instructions on how to access the UI on Amazon EMR, refer to View web interfaces hosted on Amazon EMR clusters.

Apache Hadoop YARN UI

  • SSH into the emr-cluster-capacity-scheduler cluster and review the following files.For instructions on how to SSH into the EMR primary node, refer to Connect to the master node using SSH.
    • /etc/hadoop/conf/yarn-site.xml
    • /etc/hadoop/conf/capacity-scheduler.xml

All the parameters set using the yarn-site and capacity-scheduler classifiers are reflected in these files. If an admin wants to update CapacityScheduler configs, they can directly update capacity-scheduler.xml and run the following command to apply the changes without interrupting any running jobs and services:

yarn rmadmin -resfreshQueues

Changes to yarn-site.xml require the ResourceManager service to be restarted, which interrupts the running jobs. As a best practice, refrain from manual modifications and use version control for change management.

The CloudFormation template adds a bootstrap action to create test users (user1, user2, user3, user4, user5 and user6) on all the nodes and adds a step script to create HDFS directories for the test users.

Users can SSH into the  primary node, sudo as different users and submit Spark jobs to verify the job submission and CapacityScheduler behavior:

[[email protected] ~]$ sudo su - user1
[[email protected] ~]$ spark-submit --master yarn --deploy-mode cluster \
--class org.apache.spark.examples.SparkPi /usr/lib/spark/examples/jars/spark-examples.jar

You can validate the results from the resource manager web UI.

Apache Hadoop YARN Jobs List

Clean up

To avoid incurring future charges, delete the resources you created.

  • Delete the CloudFormation stack:
    aws cloudformation delete-stack --stack-name emr-yarn-capacity-scheduler

  • Delete the S3 bucket:
    aws s3 rb s3://emr-yarn-capacity-scheduler-<AWS_ACCOUNT_ID>-<AWS_REGION> --force

The command deletes the bucket and all files underneath it. The files may not be recoverable after deletion.

Conclusion

In this post, we discussed Apache Hadoop YARN and its various components. We discussed the types of schedulers available in Hadoop YARN. We dived deep in to the specifics of Hadoop YARN CapacityScheduler and the use of Dominant Resource Fairness to efficiently allocate resources to submitted jobs. We also showcased how to implement the discussed concepts using AWS CloudFormation.

We encourage you to use this post as a starting point to implement CapacityScheduler on Amazon EMR (on Amazon EC2) and customize the solution to meet your specific data platform goals.


About the authors

Suvojit Dasgupta is a Sr. Lakehouse Architect at Amazon Web Services. He works with customers to design and build data solutions on AWS.

Bharat Gamini is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust, and secure cloud-based analytical solutions on AWS.

Amazon EMR on EKS gets up to 19% performance boost running on AWS Graviton3 Processors vs. Graviton2

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/amazon-emr-on-eks-gets-up-to-19-performance-boost-running-on-aws-graviton3-processors-vs-graviton2/

Amazon EMR on EKS is a deployment option that enables you to run Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS) easily. It allows you to innovate faster with the latest Apache Spark on Kubernetes architecture while benefiting from the performance-optimized Spark runtime powered by Amazon EMR. This deployment option elects Amazon EKS as its underlying compute to orchestrate containerized Spark applications with better price performance.

AWS continually innovates to provide choice and better price-performance for our customers, and the third-generation Graviton processor is the next step in the journey. Amazon EMR on EKS now supports Amazon Elastic Compute Cloud (Amazon EC2) C7g—the latest AWS Graviton3 instance family. On a single EKS cluster, we measured EMR runtime for Apache Spark performance by comparing C7g with C6g families across selected instance sizes of 4XL, 8XL and 12XL. We are excited to observe a maximum 19% performance gain over the 6th generation C6g Graviton2 instances, which leads to a 15% cost reduction.

In this post, we discuss the performance test results that we observed while running the same EMR Spark runtime on different Graviton-based EC2 instance types.

For some use cases, such as the benchmark test, running a data pipeline that requires a mix of CPU types for the granular-level cost efficiency, or migrating an existing application from Intel to Graviton-based instances, we usually spin up different clusters that host separate types of processors, such as x86_64 vs. arm64. However, Amazon EMR on EKS has made it easier. In this post, we also provide guidance on running Spark with multiple CPU architectures in a common EKS cluster, so that we can save significant time and effort on setting up a separate cluster to isolate the workloads.

Infrastructure innovation

AWS Graviton3 is the latest generation of AWS-designed Arm-based processors, and C7g is the first Graviton3 instance in AWS. The C family is designed for compute-intensive workloads, including batch processing, distributed analytics, data transformations, log analysis, and more. Additionally, C7g instances are the first in the cloud to feature DDR5 memory, which provides 50% higher memory bandwidth compared to DDR4 memory, to enable high-speed access to data in memory. All these innovations are well-suited for big data workloads, especially the in-memory processing framework Apache Spark.

The following table summarizes the technical specifications for the tested instance types:

Instance Name vCPUs Memory (GiB) EBS-Optimized Bandwidth (Gbps) Network Bandwidth (Gbps) On-Demand Hourly Rate
c6g.4xlarge 16 32 4.75 Up to 10 $0.544
c7g.4xlarge 16 32 Up to 10 Up to 15 $0.58
c6g.8xlarge 32 64 9 12 $1.088
c7g.8xlarge 32 64 10 15 $1.16
c6g.12xlarge 48 96 13.5 20 $1.632
c7g.12xlarge 48 96 15 22.5 $1.74

These instances are all built on AWS Nitro System, a collection of AWS-designed hardware and software innovations. The Nitro System offloads the CPU virtualization, storage, and networking functions to dedicated hardware and software, delivering performance that is nearly indistinguishable from bare metal. Especially, C7g instances have included support for Elastic Fabric Adapter (EFA), which becomes the standard on this instance family. It allows our applications to communicate directly with network interface cards providing lower and more consistent latency. Additionally, these are all Amazon EBS-optimized instances, and C7g provides higher dedicated bandwidth for EBS volumes, which can result in better I/O performance contributing to quicker read/write operations in Spark.

Performance test results

To quantify performance, we ran TPC-DS benchmark queries for Spark with a 3TB scale. These queries are derived from TPC-DS standard SQL scripts, and the test results are not comparable to other published TPC-DS benchmark outcomes. Apart from the benchmark standards, a single Amazon EMR 6.6 Spark runtime (compatible with Apache Spark version 3.2.0) was used as the data processing engine across six different managed node groups on an EKS cluster: C6g_4, C7g_4,C6g_8, C7g_8, C6g_12, C7g_12. These groups are named after instance type to distinguish the underlying compute resources. Each group can automatically scale between 1 and 30 nodes within its corresponding instance type. Architecting the EKS cluster in such a way, we can run and compare our experiments in parallel, each of which is hosted in a single node group, i.e., an isolated compute environment on a common EKS cluster. It also makes it possible to run an application with multiple CPU architectures on the single cluster. Check out the sample EKS cluster configuration and benchmark job examples for more details.

We measure the Graviton performance and cost improvements using two calculations: total query runtime and geometric mean of the total runtime. The following table shows the results for equivalent sized C6g and C7g instances and the same Spark configurations.

Benchmark Attributes 12 XL 8 XL 4 XL
Task parallelism (spark.executor.core*spark.executor.instances) 188 cores (4*47) 188 cores (4*47) 188 cores (4*47)
spark.executor.memory 6 GB 6 GB 6 GB
Number of EC2 instances 5 7 16
EBS volume 4 * 128 GB io1 disk 4 * 128 GB io1 disk 4 * 128 GB io1 disk
Provisioned IOPS per volume 6400 6400 6400
Total query runtime on C6g (sec) 2099 2098 2042
Total query runtime on C7g (sec) 1728 1738 1660
Total run time improvement with C7g 18% 17% 19%
Geometric mean query time on C6g (sec) 9.74 9.88 9.77
Geometric mean query time on C7g (sec) 8.40 8.32 8.08
Geometric mean improvement with C7g 13.8% 15.8% 17.3%
EMR on EKS memory usage cost on C6g (per run) $0.28 $0.28 $0.28
EMR on EKS vCPU usage cost on C6g (per run) $1.26 $1.25 $1.24
Total cost per benchmark run on C6g (EC2 + EKS cluster + EMR price) $6.36 $6.02 $6.52
EMR on EKS memory usage cost on C7g (per run) $0.23 $0.23 $0.22
EMR on EKS vCPU usage cost on C7g (per run) $1.04 $1.03 $0.99
Total cost per benchmark run on C7g (EC2 + EKS cluster + EMR price) $5.49 $5.23 $5.54
Estimated cost reduction with C7g 13.7% 13.2% 15%

The total number of cores and memory are identical across all benchmarked instances, and four provisioned IOPS SSD disks were attached to each EBS-optimized instance for the optimal disk I/O performance. To allow for comparison, these configurations were intentionally chosen to match with settings in other EMR on EKS benchmarks. Check out the previous benchmark blog post Amazon EMR on Amazon EKS provides up to 61% lower costs and up to 68% performance improvement for Spark workloads for C5 instances based on x86_64 Intel CPU.

The table indicates C7g instances have consistent performance improvement compared to equivalent C6g Graviton2 instances. Our test results showed 17–19% improvement in total query runtime for selected instance sizes, and 13.8–17.3% improvement in geometric mean. On cost, we observed 13.2–15% cost reduction on C7g performance tests compared to C6g while running the 104 TPC-DS benchmark queries.

Data shuffle in a Spark workload

Generally, big data frameworks schedule computation tasks for different nodes in parallel to achieve optimal performance. To proceed with its computation, a node must have the results of computations from upstream. This requires moving intermediate data from multiple servers to the nodes where data is required, which is termed as shuffling data. In many Spark workloads, data shuffle is an inevitable operation, so it plays an important role in performance assessments. This operation may involve a high rate of disk I/O, network data transmission, and could burn a significant amount of CPU cycles.

If your workload is I/O bound or bottlenecked by current data shuffle performance, one recommendation is to benchmark on improved hardware. Overall, C7g offers better EBS and network bandwidth compared to equivalent C6g instance types, which may help you optimize performance. Therefore, in the same benchmark test, we captured the following extra information, which is broken down into per-instance-type network/IO improvements.

Based on the TPC-DS query test result, this graph illustrates the percentage increases of data shuffle operations in four categories: maximum disk read and write, and maximum network received and transmitted. In comparison to c6g instances, the disk read performance improved between 25–45%, whereas the disk write performance increase was 34–47%. On the network throughput comparison, we observed an increase of 21–36%.

Run an Amazon EMR on EKS job with multiple CPU architectures

If you’re evaluating migrating to Graviton instances for Amazon EMR on EKS workloads, we recommend testing the Spark workloads based on your real-world use cases. If you need to run workloads across multiple processor architectures, for example test the performance for Intel and Arm CPUs, follow the walkthrough in this section to get started with some concrete ideas.

Build a single multi-arch Docker image

To build a single multi-arch Docker image (x86_64 and arm64), complete the following steps:

  1. Get the Docker Buildx CLI extension.Docker Buildx is a CLI plugin that extends the Docker command to support the multi-architecture feature. Upgrade to the latest Docker desktop or manually download the CLI binary. For more details, check out Working with Buildx.
  2. Validate the version after the installation:
    docker buildx version

  3. Create a new builder that gives access to the new multi-architecture features (you only have to perform this task once):
    docker buildx create --name mybuilder --use

  4. Log in to your own Amazon ECR registry:
    AWS_REGION=us-east-1
    ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
    ECR_URL=$ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com
    aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URL

  5. Get the EMR Spark base image from AWS:
    SRC_ECR_URL=755674844232.dkr.ecr.us-east-1.amazonaws.com
    docker pull $SRC_ECR_URL/spark/emr-6.6.0:latest
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $SRC_ECR_URL

  6. Build and push a custom Docker image.

In this case, we build a single Spark benchmark utility docker image on top of Amazon EMR 6.6. It supports both Intel and Arm processor architectures:

  • linux/amd64 – x86_64 (also known as AMD64 or Intel 64)
  • linux/arm64 – Arm
docker buildx build \
--platform linux/amd64,linux/arm64 \
-t $ECR_URL/eks-spark-benchmark:emr6.6 \
-f docker/benchmark-util/Dockerfile \
--build-arg SPARK_BASE_IMAGE=$SRC_ECR_URL/spark/emr-6.6.0:latest \
--push .

Submit Amazon EMR on EKS jobs with and without Graviton

For our first example, we submit a benchmark job to the Graviton3 node group that spins up c7g.4xlarge instances.

The following is not a complete script. Check out the full version of the example on GitHub.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name emr66-c7-4xl \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.6.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
    "entryPoint": "local:///usr/lib/spark/examples/jars/eks-spark-benchmark-assembly-1.0.jar",
    "entryPointArguments":[.......],
    "sparkSubmitParameters": "........"}}' \
--configuration-overrides '{
"applicationConfiguration": [{
    "classification": "spark-defaults",
    "properties": {
        "spark.kubernetes.container.image": "'$ECR_URL'/eks-spark-benchmark:emr6.6",
        "spark.kubernetes.node.selector.eks.amazonaws.com/nodegroup": “C7g_4”
}}]}'

In the following example, we run the same job on non-Graviton C5 instances with Intel 64 CPU. The full version of the script is available on GitHub.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name emr66-c5-4xl \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.6.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
    "entryPoint": "local:///usr/lib/spark/examples/jars/eks-spark-benchmark-assembly-1.0.jar",
    "entryPointArguments":[.......],
    "sparkSubmitParameters": "........"}}' \    
--configuration-overrides '{
"applicationConfiguration": [{
    "classification": "spark-defaults",
    "properties": {
        "spark.kubernetes.container.image": "'$ECR_URL'/eks-spark-benchmark:emr6.6",
        "spark.kubernetes.node.selector.eks.amazonaws.com/nodegroup”: “C5_4”
}}]}'

Summary

In May 2022, the Graviton3 instance family was made available to Amazon EMR on EKS. After running the performance-optimized EMR Spark runtime on the selected latest Arm-based Graviton3 instances, we observed up to 19% performance increase and up to 15% cost savings compared to C6g Graviton2 instances. Because Amazon EMR on EKS offers 100% API compatibility with open-source Apache Spark, you can quickly step into the evaluation process with no application changes.

If you’re wondering how much performance gain you can achieve with your use case, try out the benchmark solution or the EMR on EKS Workshop. You can also contact your AWS Solutions Architects, who can be of assistance alongside your innovation journey.


About the author

Melody Yang is a Senior Big Data Solution Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Graviton Fast Start – A New Program to Help Move Your Workloads to AWS Graviton

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/graviton-fast-start-a-new-program-to-help-move-your-workloads-to-aws-graviton/

With the Graviton Challenge last year, we helped customers migrate to Graviton-based EC2 instances and get up to 40 percent price performance benefit in as little as 4 days. Tens of thousands of customers, including 48 of the top 50 Amazon Elastic Compute Cloud (Amazon EC2) customers, use AWS Graviton processors for their workloads. In addition to EC2, many AWS managed services can run their workloads on Graviton. For most customers, adoption is easy, requiring minimal code changes. However, the effort and time required to move workloads to Graviton depends on a few factors including your software development environment and the technology stack on which your application is built.

This year, we want to take it a step further and make it even easier for customers to adopt Graviton not only through EC2, but also through managed services. Today, we are launching AWS Graviton Fast Start, a new program that makes it even easier to move your workloads to AWS Graviton by providing step-by-step directions for EC2 and other managed services that support the Graviton platform:

  • Amazon Elastic Compute Cloud (Amazon EC2) – EC2 provides the most flexible environment for a migration and can support many kinds of workloads, such as web apps, custom databases, or analytics. You have full control over the interpreted or compiled code running in the EC2 instance. You can also use many open-source and commercial software products that support the Arm64 architecture.
  • AWS Lambda – Migrating your serverless functions can be really easy, especially if you use an interpreted runtime such as Node.js or Python. Most of the time, you only have to check the compatibility of your software dependencies. I have shown a few examples in this blog post.
  • AWS Fargate – Fargate works best if your applications are already running in containers or if you are planning to containerize them. By using multi-architecture container images or images that have Arm64 in their image manifest, you get the serverless benefits of Fargate and the price-performance advantages of Graviton.
  • Amazon Aurora – Relational databases are at the core of many applications. If you need a database compatible with PostgreSQL or MySQL, you can use Amazon Aurora to have a highly performant and globally available database powered by Graviton.
  • Amazon Relational Database Service (RDS) – Similarly to Aurora, Amazon RDS engines such as PostgreSQL, MySQL, and MariaDB can provide a fully managed relational database service using Graviton-based instances.
  • Amazon ElastiCache – When your workload requires ultra-low latency and high throughput, you can speed up your applications with ElastiCache and have a fully managed in-memory cache running on Graviton and compatible with Redis or Memcached.
  • Amazon EMR – With Amazon EMR, you can run large-scale distributed data processing jobs, interactive SQL queries, and machine learning applications on Graviton using open-source analytics frameworks such as Apache SparkApache Hive, and Presto.

Here’s some feedback we got from customers running their workloads on Graviton:

  • Formula 1 racing told us that Graviton2-based C6gn instances provided the best price performance benefits for some of their computational fluid dynamics (CFD) workloads. More recently, they found that Graviton3 C7g instances are 40 percent faster for the same simulations and expect Graviton3-based instances to become the optimal choice to run all of their CFD workloads.
  • Honeycomb has 100 percent of their production workloads running on Graviton using EC2 and Lambda. They have tested the high-throughput telemetry ingestion workload they use for their observability platform against early preview instances of Graviton3 and have seen a 35 percent performance increase for their workload over Graviton2. They were able to run 30 percent fewer instances of C7g than C6g serving the same workload and with 30 percent reduced latency. With these instances in production, they expect over 50 percent price performance improvement over x86 instances.
  • Twitter is working on a multi-year project to leverage Graviton-based EC2 instances to deliver Twitter timelines. As part of their ongoing effort to drive further efficiencies, they tested the new Graviton3-based C7g instances. Across a number of benchmarks representative of their workloads, they found Graviton3-based C7g instances deliver 20-80 percent higher performance compared to Graviton2-based C6g instances, while also reducing tail latencies by as much as 35 percent. They are excited to utilize Graviton3-based instances in the future to realize significant price performance benefits.

With all these options, getting the benefits of running all or part of your workload on AWS Graviton can be easier than you expect. To help you get started, there’s also a free trial on the Graviton-based T4g instances for up to 750 hours per month through December 31st, 2022.

Visit AWS Graviton Fast Start to get step-by-step directions on how to move your workloads to AWS Graviton.

Danilo

AWS Week In Review – July 25, 2022

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-week-in-review-july-25-2022/

A few weeks ago, we hosted the first EMEA AWS Heroes Summit in Milan, Italy. This past week, I had the privilege to join the Americas AWS Heroes Summit in Seattle, Washington, USA. Meeting with our community experts is always inspiring and a great opportunity to learn from each other. During the Summit, AWS Heroes from North America and Latin America shared their thoughts with AWS developer advocates and product teams on topics such as serverless, containers, machine learning, data, and DevTools. You can learn more about the AWS Heroes program here.

AWS Heroes Summit Americas 2022

Last Week’s Launches
Here are some launches that got my attention during the previous week:

Cloudscape Design System Cloudscape is an open source design system for creating web applications. It was built for and is used by AWS products and services. We created it in 2016 to improve the user experience across web applications owned by AWS services and also to help teams implement those applications faster. If you’ve ever used the AWS Management Console, you’ve seen Cloudscape in action. If you are building a product that extends the AWS Management Console, designing a user interface for a hybrid cloud management system, or setting up an on-premises solution that uses AWS, have a look at Cloudscape Design System.

Cloudscape Design System

AWS re:Post introduces community-generated articlesAWS re:Post gives you access to a vibrant community that helps you become even more successful on AWS. Expert community members can now share technical guidance and knowledge beyond answering questions through the Articles feature. Using this feature, community members can share best practices and troubleshooting processes and address customer needs around AWS technology in greater depth. The Articles feature is unlocked for community members who have achieved Rising Star status on re:Post or subject matter experts who built their reputation in the community based on their contributions and certifications. If you have a Rising Star status on re:Post, start writing articles now! All other members can unlock Rising Star status through community contributions or simply browse available articles today on re:Post.

AWS re:Post

AWS Lambda announces support for attribute-based access control (ABAC) and new IAM condition key – You can now use attribute-based access control (ABAC) with AWS Lambda to control access to functions within AWS Identity and Access Management (IAM) using tags. ABAC is an authorization strategy that defines access permissions based on attributes. In AWS, these attributes are called tags. With ABAC, you can scale an access control strategy by setting granular permissions with tags without requiring permissions updates for every new user or resource as your organization scales. Read this blog post by Julian Wood and Chris McPeek to learn more.

AWS Lambda also announced support for lambda:SourceFunctionArn, a new IAM condition key that can be used for IAM policy conditions that specify the Amazon Resource Name (ARN) of the function from which a request is made. You can use the Condition element in your IAM policy to compare the lambda:SourceFunctionArn condition key in the request context with values that you specify in your policy. This allows you to implement advanced security controls for the AWS API calls taken by your Lambda function code. For more details, have a look at the Lambda Developer Guide.

Amazon Fraud Detector launches Account Takeover Insights (ATI)Amazon Fraud Detector now supports an Account Takeover Insights (ATI) model, a low-latency fraud detection machine learning model specifically designed to detect accounts that have been compromised through stolen credentials, phishing, social engineering, or other forms of account takeover. The ATI model is designed to detect up to four times more ATI fraud than traditional rules-based account takeover solutions while minimizing the level of friction for legitimate users. To learn more, have a look at the Amazon Fraud Detector documentation.

Amazon EMR on EC2 clusters (EMR Clusters) introduces more fine-grained access controls – Previously, all jobs running on an EMR cluster used the IAM role associated with the EMR cluster’s EC2 instances to access resources. This role is called the EMR EC2 instance profile. With the new runtime roles for Amazon EMR Steps, you can now specify a different IAM role for your Apache Spark and Hive jobs, scoping down access at a job level. This simplifies access controls on a single EMR cluster that is shared between multiple tenants, wherein each tenant is isolated using IAM roles. You can now also enforce table and column permissions based on your Amazon EMR runtime role to manage your access to data lakes with AWS Lake Formation. For more details, read the What’s New post.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional news and customer stories you may find interesting:

AWS open-source news and updates – My colleague Ricardo Sueiras writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community. Read edition #121 here.

AI Use Case Explorer – If you are interested in AI use cases, have a look at the new AI Use Case Explorer. You can search over 100 use cases and 400 customer success stories by industry, business function, and the business outcome you want to achieve.

Bayer centralizes and standardizes data from the carbon program using AWS – To help Brazilian farmers adopt climate-smart agricultural practices and reduce carbon emissions in their activities, Bayer created the Carbon Program, which aims to build carbon-neutral agriculture practices. Learn how Bayer uses AWS to centralize and standardize the data received from the different partners involved in the project in this Bayer case study.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

AWS re:Inforce 2022 – The event will be held this week in person on July 26 and 27 in Boston, Massachusetts, USA. You can watch the keynote and leadership sessions online for free. AWS On Air will also stream live from re:Inforce.

AWS SummitAWS Global Summits – AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Registrations are open for the following AWS Summits in August:

Imagine Conference 2022IMAGINE 2022 – The IMAGINE 2022 conference will take place on August 3 at the Seattle Convention Center, Washington, USA. It’s a no-cost event that brings together education, state, and local leaders to learn about the latest innovations and best practices in the cloud. You can register here.

I’ll be speaking at Data Con LA on August 13–14 in Los Angeles, California, USA. Feel free to say “Hi!” if you’re around. And if you happen to be at Ray Summit on August 23–24 in San Francisco, California, USA, stop by the AWS booth. I’ll be there to discuss all things Ray on AWS.

That’s all for this week. Check back next Monday for another Week in Review!

Antje

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Enable federated governance using Trino and Apache Ranger on Amazon EMR

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/enable-federated-governance-using-trino-and-apache-ranger-on-amazon-emr/

Managing data through a central data platform simplifies staffing and training challenges and reduces the costs. However, it can create scaling, ownership, and accountability challenges, because central teams may not understand the specific needs of a data domain, whether it’s because of data types and storage, security, data catalog requirements, or specific technologies needed for data processing. One of the architecture patterns that has emerged recently to tackle this challenge is the data mesh architecture, which gives ownership and autonomy to individual teams who own the data. One of the major components of implementing a data mesh architecture lies in enabling federated governance, which includes centralized authorization and audits.

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

Trino, on the other hand, is a highly parallel and distributed query engine, and provides federated access to data by using connectors to multiple backend systems like Hive, Amazon Redshift, and Amazon OpenSearch Service. Trino acts as a single access point to query all data sources.

By combining Trino query federation features with the authorization and audit capability of Apache Ranger, you can enable federated governance. This allows multiple purpose-built data engines to function as one, with a single centralized place to manage data access controls.

This post shares details on how to architect this solution using the new EMR Ranger Trino plugin on Amazon EMR 6.7.

Solution overview

Trino allows you to query data in different sources, using an extensive set of connectors. This feature enables you to have a single point of entry for all data sources that can be queried through SQL.

The following diagram illustrates the high-level overview of the architecture.

This architecture is based on four major components:

  • Windows AD, which is responsible for providing the identities of users across the system. It’s mainly composed of a key distribution center (KDC) that provides kerberos tickets to AD users to interact with the EMR cluster, and a Lightweight Directory Access Protocol (LDAP) server that defines the organization of users in logical structures.
  • An Apache Ranger server, which runs on an Amazon Elastic Compute Cloud (Amazon EC2) instance whose lifecycle is independent from the one of the EMR cluster. Apache Ranger is composed of a Ranger admin server that stores and retrieves policies in and from a MySQL database running in Amazon Relational Database Service (Amazon RDS), a usersync server that connects to the Windows AD LDAP server to synchronize identities to make them available for policy settings, and an optional Apache Solr server to index and store audits.
  • An Amazon RDS for MySQL database instance used by the Hive metastore to store metadata related to the tables schemas, and the Apache Ranger server to store the access control policies.
  • An EMR cluster with the following configuration updates:
    • Apache Ranger security configuration.
    • A local KDC that establishes a one-way trust with Windows AD in order to have the Kerberized EMR cluster recognize the user identities from the AD.
    • A Hue user interface with LDAP authentication enabled to run SQL queries on the Trino engine.
  • An Amazon CloudWatch log group to store all audit logs for the AWS managed Ranger plugins.
  • (Optional) Trino connectors for other execution engines like Amazon Redshift, Amazon OpenSearch Service, PostgresSQL, and others.

Prerequisites

Before getting started, you must have the following prerequisites. For more information, refer to the Prerequisites and Setting up your resources sections in Introducing Amazon EMR integration with Apache Ranger.

To set up the new Apache Ranger Trino plugin, the following steps are required:

  1. Delete any existing Presto service definitions in the Apache Ranger admin server:
    #Delete Presto Service Definition
    curl -f -u *<admin users login>*:*_<_**_password_ **_for_** _ranger admin user_**_>_* -X DELETE -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef/name/presto'

  2. Download and add new Apache Ranger service definitions for Trino in the Apache Ranger admin server:
     wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-trino.json
    
    curl -u *<admin users login>*:*_<_**_password_ **_for_** _ranger admin user_**_>_* -X POST -d @ranger-servicedef-amazon-emr-trino.json \
    -H "Accept: application/json" \
    -H "Content-Type: application/json" \
    -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

  3. Create a new Amazon EMR security configuration for Apache Ranger installation to include Trino policy repository details. For more information, see Create the EMR security configuration.
  4. Optionally, if you want to use the Hue UI to run Trino SQL, add the hue user to the Apache Ranger admin server. Run the following command on the Ranger admin server:
    # Note: input parameter Ranger host IP address
     
     set -x
    ranger_server_fqdn=$1
    RANGER_HTTP_URL=https://$ranger_server_fqdn:6182
    
    cat > hueuser.json << EOF
    { 
      "name": "hue1",
      "firstName": "hue",
      "lastName": "",
      "loginId": "hue1",
      "emailAddress" : null,
      "description" : "hue user",
      "password" : "user1pass",
      "groupIdList": [],
      "groupNameList": [],
      "status": 1,
      "isVisible": 1,
      "userRoleList": [ "ROLE_USER" ],
      "userSource": 0
    }
    EOF
    
    #add users 
    curl -u admin:admin -v -i -s -X POST  -d @hueuser.json -H "Accept: application/json" -H "Content-Type: application/json"  -k $RANGER_HTTP_URL/service/xusers/secure/users

After you add the hue user, it’s used to impersonate SQL calls submitted by AD users.

Warning: Impersonation feature should always be used carefully to avoid giving any/all users access to high privileges.

This post also demonstrates the capabilities of running queries against external databases, such as Amazon Redshift and PostgresSQL using Trino connectors, while controlling access at the database, table, row, and column level using the Apache Ranger policies. This requires you to set up the database engines you want to connect with. The following example code demonstrates using the Amazon Redshift connector. To set up the connector, create the file redshift.properties under /etc/trino/conf.dist/catalog on all Amazon EMR nodes and restart the Trino server.

  • Create the redshift.properties property file on all the Amazon EMR nodes with the following code:
    # Create a new redshift.properties file
    /etc/trino/conf.dist/catalog/redshift.properties

  • Update the property file with the Amazon Redshift cluster details:
    connector.name=redshift
    connection-url=jdbc:redshift://XXXXX:5439/dev
    connection-user=XXXX
    connection-password=XXXXX

  • Restart the Trino server:
    # Restart Trino server 
    sudo systemctl stop trino-server.service
    sudo systemctl start trino-server.service

  • In a production environment, you can automate this step using the following inside your EMR Classification:
    {
    "Classification": "trino-connector-redshift",
    "Properties": {
    "connector.name": "redshift",
    "connection-url": "jdbc:redshift://XXXXX:5439/dev",
    "connection-user": "XXXX",
    "connection-password": "XXXX"
    }
    }

Test your setup

In this section, we go through an example where the data is distributed across Amazon Redshift for dimension tables and Hive for fact tables. We can use Trino to join data between these two engines.

On Amazon Redshift, let’s define a new dimension table called Products and load it with data:

--- Setup products table in Redshift
 > create table public.products 
 (company VARCHAR, link VARCHAR, price FLOAT, product_category VARCHAR, 
 release_date VARCHAR, sku VARCHAR);

--- Copy data from S3

 > COPY public.products
  FROM 's3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/products/'
  IAM_ROLE '<XXXXXXXXX>'
  FORMAT AS PARQUET;

Then use the Hue UI to create the Hive external table Orders:

CREATE EXTERNAL TABLE IF NOT EXISTS default.orders 
(customer_id STRING, order_date STRING, price DOUBLE, sku STRING)
STORED AS PARQUET
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/orders';

Now let’s use Trino to join both datasets:

-- Join the dimension table in Redshift (products) with the fact table in hive (orders), 
to get the sum of sales by product_category and sku
SELECT sum(orders.price) total_sales, products.sku, products.product_category
FROM hive.staging.orders join redshift.public.products on orders.sku = products.sku
group by products.sku, products.product_category limit 10

The following screenshot shows our results.

Row filtering and column masking

Apache Ranger supports policies to allow or deny access based on several attributes, including user, group, and predefined roles, as well as dynamic attributes like IP address and time of access. In addition, the model supports authorization based on the classification of the resources such as like PII, FINANCE, and SENSITIVE.

Another feature is the ability to allow users to access only a subset of rows in a table or restrict users to access only masked or redacted values of sensitive data. Examples of this include the ability to restrict users to access only records of customers located in the same country where the user works, or allow a user who is doctor to see only records of patients that are associated with that doctor.

The following screenshots show how, by using Trino Ranger policies, you can enable row filtering and column masking of data in Amazon Redshift tables.

The example policy masks the firstname column, and applies a filter condition on the city column to restrict users to view rows for a specific city only.

The following screenshot shows our results.

Dynamic row filtering using user session context

The Trino Ranger plugin passes down Trino session data like current_user() that you can use in the policy definition. This can vastly simplify your row filter conditions by removing the need for hardcoded values and using a mapping lookup. For more details on dynamic row filtering, refer to Row-level filtering and column-masking using Apache Ranger policies in Apache Hive.

Known issue with Amazon EMR 6.7

Amazon EMR 6.7 has a known issue when enabling Kerberos 1-way trust with Microsoft windows AD. Please run this bootstrap script following these instructions as part of the cluster launch.

Limitations

When using this solution, keep in mind the following limitations, further details can be found here:

  • Non-Kerberos clusters are not supported.
  • Audit logs are not visible on the Apache Ranger UI, because these are sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.
  • The integration between Amazon EMR and Apache Ranger limits the available applications. For a full list, refer to Application support and limitations.

Troubleshooting

If you can’t log in to the EMR cluster’s node as an AD user, and you get the error message Permission denied, please try again.

This can happen if the SSSD service has stopped on the node you are trying to access. To fix this, connect to the node using the configured SSH key-pair or by making use of Session Manager  and run the following command.

sudo service sssd restart

If you’re unable to download policies from Ranger admin server, and you get the error message Error getting policies with the HTTP status code 400. This can be caused because either the certificate has expired or the Ranger policy definition is not set up correctly.

To fix this, check the Ranger admin logs. If it shows the following error, it’s likely that the certificates have expired.

[email protected]={1} msgDesc={Unauthorized access - unable to get client certificate} messageList={[VXMessage={org.apache.ran
[email protected]={OPER_NOT_ALLOWED_FOR_ENTITY} rbKey={xa.error.oper_not_allowed_for_state} message={Operation not allowed for entity} objectId={n
ull} fieldName={null} }]} }

You will need to perform the following steps to address the issue

  • Recreate the certs using the create-tls-certs.sh script and upload them to Secrets Manager.
  • Then update the Ranger admin server configuration with new certificate, and restart Ranger admin service.
  • Create a new EMR security configuration using the new certificate, and re-launch EMR cluster using new security configurations.

The issue can also be caused due to a misconfigured Ranger policy definition. The Ranger admin service policy definition should trust the self-signed certificate chain. Make sure the following configuration attribute for the service definitions has the correct domain name or pattern to match the domain name used for your EMR cluster nodes.

If the EMR cluster keeps failing with the error message Terminated with errors: An internal error occurred, check the Amazon EMR primary node secret agent logs.

If it shows the following message, the cluster is failing because the specified CloudWatch log group doesn’t exist:

Exception in thread "main" com.amazonaws.services.logs.model.ResourceNotFoundException: The specified log group does not exist. (Service: AWSLogs; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: d9fa2ef1-17cb-4b8f-a07f-8a6aea245173; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384)

A query run through trino-cli might fail with the error Unable to obtain password from user. For example:

ERROR   remote-task-callback-42 io.trino.execution.StageStateMachine    Stage 20220422_023236_00005_zn4vb.1 failed
com.google.common.util.concurrent.UncheckedExecutionException: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: javax.security.auth.login.LoginException: Unable to obtain password from user

This issue can occur due to incorrect realm names in the etc/trino/conf.dist/catalog/hive.properties file. Check the domain or realm name and other Kerberos related configs in the etc/trino/conf.dist/catalog/hive.properties file. Optionally, also check the /etc/trino/conf.dist/trino-env.sh and /etc/trino/conf.dist/config.properties files in case any config changes has been made.

Clean up

Clean up the resources created either manually or by the AWS CloudFormation template provided in GitHub repo to avoid unnecessary cost to your AWS account. You can delete the CloudFormation stack by selecting the stack on the AWS CloudFormation console and choosing Delete. This action deletes all the resources it provisioned. If you manually updated a template-provisioned resource, you may encounter some issues during cleanup; you need to clean these up independently.

Conclusion

A data mesh approach encourages the idea of data domains where each domain team owns their data and is responsible for data quality and accuracy. This draws parallels with a microservices architecture. Building federated data governance like we show in this post is at the core of implementing a data mesh architecture. Combining the powerful query federation capabilities of Apache Trino with the centralized authorization and audit capabilities of Apache Ranger provides an end-to-end solution to operate and govern a data mesh platform.

In addition to the already available Ranger integrations capabilities for Apache SparkSQL, Amazon S3, and Apache Hive, starting from 6.7 release, Amazon EMR includes plugins for Ranger Trino integrations. For more information, refer to EMR Trino Ranger plugin.


About the authors

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up gardening during the lockdown.

Partha Sarathi Sahoo is an Analytics Specialist TAM – at AWS based in Sydney, Australia. He brings 15+ years of technology expertise and helps Enterprise customers optimize Analytics workloads. He has extensively worked on both on-premise and cloud Bigdata workloads along with various ETL platform in his previous roles. He also actively works on conducting proactive operational reviews around the Analytics services like Amazon EMR, Redshift, and OpenSearch.

Anis Harfouche is a Data Architect at AWS Professional Services. He helps customers achieving their business outcomes by designing, building and deploying data solutions based on AWS services.

Disaster recovery considerations with Amazon EMR on Amazon EC2 for Spark workloads

Post Syndicated from Bharat Gamini original https://aws.amazon.com/blogs/big-data/disaster-recovery-considerations-with-amazon-emr-on-amazon-ec2-for-spark-workloads/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR launches all nodes for a given cluster in the same Amazon Elastic Compute Cloud (Amazon EC2) Availability Zone to improve performance. During an Availability Zone failure or due to any unexpected interruption, Amazon EMR may not be accessible, and we need a disaster recovery (DR) strategy to mitigate this problem.

Part of architecting a resilient, highly available Amazon EMR solution is the consideration that failures do occur. These unexpected interruptions can be caused by natural disasters, technical failures, and human interactions resulting in an Availability Zone outage. The EMR cluster could also become unreachable due to failure of critical services running on the EMR master node, network issues, or other issues.

In this post, we show you how to architect your Amazon EMR environment for disaster recovery to maintain business continuity with minimum Recovery Time Objective (RTO) during Availability Zone failure or when your EMR cluster is inoperable.

Although various disaster recovery strategies are available in the cloud, we discuss active-active and active-passive DR strategies for Amazon EMR in this post. We focus on a use case for Spark batch workloads where persistent storage is decoupled from Amazon EMR and the EMR cluster is running with a single master node. If the EMR cluster is used for persistent storage, it requires an additional strategy to replicate data from the EMR cluster, which we will cover in subsequent posts.

Prerequisites

To follow along with this post, you should have a knowledge of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and an understanding of Network Load Balancers.

Solution overview

The following diagram illustrates the solution architecture.

Customers often use Amazon MWAA to submit Spark jobs to an EMR cluster using an Apache Livy REST interface. We can configure Apache Livy to use a Network Load Balancer hostname instead of an Amazon EMR master hostname, so that we don’t need to update Livy connections from Amazon MWAA whenever a new cluster is created or stopped. You can register Network Load Balancer target groups with multiple EMR cluster master nodes for an active-active setup. In the case of an active-passive setup, we can create a new EMR cluster when a failure is detected and register the new EMR master with the Network Load Balancer target group. The Network Load Balancer automatically performs health checks and distributes requests to healthy targets. With this solution, we can maintain business continuity when an EMR cluster isn’t reachable due to Availability Zone failure or when the cluster is unhealthy due to any other reason.

Active-active DR strategy

An active-active DR setup focuses on running two EMR clusters with identical configuration in two different Availability Zones. To reduce the running costs of two active EMR clusters, we can launch both clusters with minimum capacity, and managed scaling automatically scales the cluster based on the workload. EMR managed scaling only launches instances when there is demand for resources and stops the unneeded instances when the work is finished. With this strategy, we can reduce our recovery time to near zero with optimal cost. This active-active DR strategy is suitable when businesses want to have near-zero downtime with automatic failover for your analytics workloads.

In the following section, we walk through the steps to implement the solution and provide references to related resources that provide more detailed guidance.

Create EMR clusters

We create two EMR clusters in different Availability Zones within the same Region of your choice. Use the following AWS Command Line Interface (AWS CLI) command and modify or add required configurations as per your needs:

aws emr create-cluster \
  --name "<emr-cluster-az-a>" \
  --release-label emr-6.4.0 \
  --log-uri "s3://<your-log-bucket>" \
  --applications Name=Spark Name=Livy \
  --ec2-attributes "KeyName=<your-key-name>,SubnetId=<private-subnet-in-az-a>" \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
  --use-default-roles

We can create the cluster with EMR managed scaling, which lets you automatically increase or decrease the number of instances or units in your cluster based on workload. Amazon EMR continuously evaluates cluster metrics to make scaling decisions that optimize your clusters for cost and speed.

Create and configure a Network Load Balancer

You can create a Network Load Balancer using the AWS CLI (see Create a Network Load Balancer using the AWS CLI) or the AWS Management Console (see Create a Network Load Balancer). For this post, we do so on the console.

  • Create a target group (emr-livy-dr) and register both EMR clusters’ master IP addresses in the target group.

  • Create an internal Network Load Balancer in the same VPC or Region as your EMR clusters, and choose two different Availability Zones and select the private subnets.
    These subnets don’t need to be in the same subnets as the EMR clusters, but the clusters must allow the traffic from the Network Load Balancer, which is discussed in next steps.

  • Create a TCP listener on port 8998 (the default EMR cluster Livy port) to forward requests to the target group you created.

  • Modify the EMR clusters’ master security groups to allow the Network Load Balancer’s private IP addresses to access port 8998.

You can find the Network Load Balancer’s private IP address by searching the elastic network interfaces for the Network Load Balancer’s name. For access control instructions, refer to How do I attach a security group to my Elastic Load Balancer.

When the target groups become healthy, the Network Load Balancer forwards requests to registered targets when it receives requests on Livy port 8998.

  • Get the DNS name of the Network Load Balancer.

We can also use an Amazon Route 53 alias record to use our own domain name to route traffic to the Network Load Balancer DNS name. We use this DNS name in our Amazon MWAA Livy connection.

Create and configure Amazon MWAA

Complete the following steps:

  • Make sure the execution role you’re using with Amazon MWAA has proper access to EMR clusters and other required services.
  • Update the Amazon MWAA Livy connection (livy_default) host with the Network Load Balancer hostname you created.
  • Create a new Livy connection ID if it’s not already available.

  • Use the following sample DAG to submit a sample Spark application using LivyOperator. We assign the livy_default connection to the livy_conn_id in the DAG code.
  • Enable the DAG and verify if the Spark application is successful on one of the EMR clusters.
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'airflow',
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag_name = "livy_spark_dag"
# Replace S3 bucket name
# You can use sample spark jar from EMR cluster master node
# /usr/lib/spark/examples/jars/spark-examples.jar
s3_bucket = "artifacts-bucket"
jar_location = "s3://{}/spark-examples.jar".format(s3_bucket)

dag = DAG(
    dag_id = dag_name,
    default_args=default_args,
    schedule_interval='@once',
    start_date = days_ago(1),
    catchup=False,
    tags=['emr', 'spark', 'livy']
)

livy_spark = LivyOperator(
    file=jar_location,
    class_name="org.apache.spark.examples.SparkPi",
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    task_id="livy_spark",
    conf={
    "spark.submit.deployMode": "cluster",
    "spark.app.name": dag_name
    },
    livy_conn_id="livy_default",
    dag=dag,
)

livy_spark

Test the DR plan

We can test our DR plan by creating scenarios that could be caused by real disasters. Perform the following steps to validate if our DR strategy works automatically during a disaster:

  1. Run the sample DAG multiple times and verify if Spark applications are randomly submitted to the registered EMR clusters.
  2. Stop one of the clusters and verify if jobs are automatically submitted to the other cluster in a different Availability Zone without any issues.

Active-passive DR strategy

Although the active-active DR strategy has benefits of maintaining near-zero recovery time, it’s complex to maintain two environments because both environments require patching and constant monitoring. In cases where Recovery Time Objective (RTO) and Recovery Point Objective (RPO) aren’t critical for your workloads, we can adopt an active-passive strategy. This approach offers a more economical and operationally less complex approach.

In this approach, we use a single EMR cluster as an active cluster and in case of disaster (due to Availability Zone failures or any other reason the EMR cluster is unhealthy), we launch a second EMR cluster in a different Availability Zone and redirect all our workloads to the newly launched cluster. End-users may notice some delay because launching a second EMR cluster takes time.

The high-level architecture of the active-passive DR solution is shown in the following diagram.

Complete the following steps to implement this solution:

  • Create an EMR cluster in a single Availability Zone.
  • Create target groups and register the EMR cluster master node IP address. Create target group for Resource Manager(8088), Name Node(9870) and Livy(8998) services. Change the port numbers if services are running on different ports.

  • Create a Network Load Balancer and add TCP listeners and forward requests to the respective target groups.

  • Create an Amazon MWAA environment with proper access to the EMR cluster in the same Region.
  • Edit the Amazon MWAA Livy connection to use the Network Load Balancer DNS name.
  • Use the updated Livy connection in Amazon MWAA DAGs to submit Spark applications.
  • Validate if we can successfully submit Spark applications via Livy to the EMR cluster.
  • Set up a DAG on Amazon MWAA or similar scheduling tool that continuously monitors the existing EMR cluster health.
  • Monitor the following key services running on the Amazon EMR master host using REST APIs or commands provided by each service. Add more health checks as required.
  • If the health check process detects a failure of the first EMR cluster, create a new EMR cluster in a different Availability Zone.
  • Automatically register the newly created EMR cluster master IP address to the Network Load Balancer target groups.
  • When the Network Load Balancer health checks are successful with the new EMR cluster master IP, delete the unhealthy EMR cluster master IP address from the target group and stop the old EMR cluster.
  • Validate the DR plan.

Follow the steps mentioned in the active-active DR strategy to create the following resources:

  • Amazon EMR
  • Amazon MWAA
  • Network Load Balancer

The following sample script provides the functionality described in this section. Use this as reference and modify it accordingly to fit your use case.

#!/bin/bash

usage() {
	cat <<EOF
   Usage: ./dr_health_check.sh j-2NPQWXK1U4E6G

   This script takes current EMR cluster id as argument and monitors the cluster health and
   creates new EMR cluster in different AZ if existing cluster is unhealthy/unreachable

EOF
	exit 1
}

[[ $# -lt 1 ]] && {
	echo Specify cluster id as argument to the script
	usage
}

#Set NLB DNS name and region
hostname="emr-ap-ae4ffe5g23fd9245.elb.us-west-2.amazonaws.com"
region="us-west-2"
cluster_id=$1
cluster_status=""

export AWS_DEFAULT_REGION=$region

#Depending on the use case perform below health checks for more than one time in a loop and if cluster state is still unhealthy then only perform remaining steps
#Ports and SSL properties for curl command may differ depending on how services are set up
rm_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8088/ws/v1/cluster | jq -r .clusterInfo.state)
if [[ $? -ne 0 || "$rm_state" != "STARTED" ]]; then
	echo "ResourceManager port not reachable or service not running"
	cluster_status="unhealthy"
fi

nn_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus | jq -r .beans[0].State)
if [[ $? -ne 0 || "$nn_state" != "active" ]]; then
	echo "NameNode port not reachable or service not running"
	cluster_status="unhealthy"
fi

livy_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8998/sessions)
if [[ $? -ne 0 ]]; then
	echo "Livy port not reachable"
	cluster_status="unhealthy"
fi

cluster_name=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Name")

update_target_groups() {
	new_master_ip=$1
	current_master_ip=$2
	current_az=$3

	nlb_arn=$(aws elbv2 describe-load-balancers --query "LoadBalancers[?DNSName==\`$hostname\`].[LoadBalancerArn]" --output text)
	target_groups=$(aws elbv2 describe-target-groups --load-balancer-arn $nlb_arn --query "TargetGroups[*].TargetGroupArn" --output text)
	IFS=" " read -a tg_array <<<$target_groups
	for tg in "${tg_array[@]}"; do
		echo "Registering new EMR master IP with target group $tg"
		aws elbv2 register-targets --target-group-arn $tg --targets Id=$new_master_ip,AvailabilityZone=all

		echo "De-registering old/unhealthy EMR master IP from target group $tg"
		aws elbv2 deregister-targets --target-group-arn $tg --targets Id=$current_master_ip,AvailabilityZone=all
	done
}

if [[ $cluster_status == "unhealthy" ]]; then
	echo "Cluster status is $cluster_status, creating new EMR cluster"
	current_az=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Ec2InstanceAttributes.Ec2AvailabilityZone")
	new_az=$(aws ec2 describe-availability-zones --output json --filters "Name=region-name,Values=$region" --query "AvailabilityZones[?ZoneName!=\`$current_az\`].ZoneName|[0]" --output=text)
	current_master_ip=$(aws emr list-instances --cluster-id $cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "Current/unhealthy cluster id $cluster_id, cluster name $cluster_name,AZ $current_az, Master private ip $current_master_ip"

	echo "Creating new EMR cluster in $new_az"
	emr_op=$(aws emr create-cluster \
		--name "$cluster_name-$new_az" \
		--release-label emr-6.4.0 \
		--applications Name=Spark Name=Livy \
		--ec2-attributes "AvailabilityZone=$new_az" \
		--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
		--use-default-role \
		--region $region)

	new_cluster_id=$(echo $emr_op | jq -r ".ClusterId")

	#wait for cluster provisioning to get master ip address
	sleep 2m

	new_master_ip=$(aws emr list-instances --cluster-id $new_cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "New EMR cluster id $new_cluster_id and Master node IP $new_master_ip"

	echo "Terminating unhealthy cluster $cluster_id/$cluster_name in $current_az"
	aws emr modify-cluster-attributes --cluster-id $cluster_id --no-termination-protected
	aws emr terminate-clusters --cluster-ids $cluster_id

	echo "Register new EMR master IP address with NLB target groups and de-register unhealthy EMR master"
	update_target_groups $new_master_ip $current_master_ip $current_az
else
	echo "Current cluster $cluster_id/$cluster_name is healthy"
fi

Summary

In this post, we shared some solutions and considerations to improve DR implementation using Amazon EMR on Amazon EC2, Network Load Balancer, and Amazon MWAA. Based on your use case, you can determine the type of DR strategy you want to deploy. We have provided the steps required to create the necessary environments and set up a successful DR strategy.

For more details about the systems and processes described in this post, refer to the following:


About the Author

Bharat Gamini is a Data Architect focused on Big Data & Analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust and secure cloud-based analytical solutions on AWS.

Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-high-performance-acid-compliant-evolving-data-lake-using-apache-iceberg-on-amazon-emr/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto.

Apache Iceberg is an open table format for huge analytic datasets. Table formats typically indicate the format and location of individual table files. Iceberg adds functionality on top of that to help manage petabyte-scale datasets as well as newer data lake requirements such as transactions, upsert/merge, time travel, and schema and partition evolution. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table.

Amazon EMR release 6.5.0 and later includes Apache Iceberg so you can reliably work with huge tables with full support for ACID (Atomic, Consistent, Isolated, Durable) transactions in a highly concurrent and performant manner without getting locked into a single file format.

In this post, we discuss the modern data lake requirements and the challenges—including support for ACID transactions and concurrent writers, partition and schema evolution—that come with these. We also discuss how Iceberg solves these challenges. Additionally, we provide a step-by-step guide on how to get started with an Iceberg notebook in Amazon EMR Studio. You can access this sample notebook from the GitHub repo. You can also find this notebook in your EMR Studio workspace under Notebook Examples.

Modern data lake challenges

Amazon EMR integrates with Amazon Simple Storage Service (Amazon S3) natively for persistent data storage, and allows you to independently scale your data in Amazon S3 and compute on your EMR cluster. This enables you to bring in data from multiple sources (for example, transactional data from operational databases, social media feeds, and SaaS data sources) using different tools, and each data source has its own transient EMR cluster to perform transformation and ingestion in parallel. You can now keep one central copy of your data and share it with multiple user groups that run analytics and even make in-place updates on a data lake. We’re increasingly seeing the following requirements (and challenges) emerge as mainstream:

  • Consistent reads and writes across multiple concurrent users – There are two primary concerns:
    • Reader-writer isolation – When a job is updating a huge dataset, another job accessing the same data frequently works on a partially updated dataset, leaving the data in an inconsistent state.
    • Concurrent writes on the same dataset – Table formats relying on coarse-grained locks slow down the system. This limitation is even more telling in real-time streaming workloads.
  • Consistent table updates across multiple files or partitions – With Hive tables, writing to multiple partitions at once isn’t an atomic operation. If you’re overwriting a partition, for instance, you might delete several files across partitions without having any guarantees that you will replace them, potentially resulting in data loss. For huge tables, it’s not practical to use global locks and keep the readers and writers waiting. Common workarounds (such as rewriting all the data in all the partitions that need to be changed at the same time and then pointing to the new locations) cause huge data duplication and redundant extract, transform, and load (ETL) jobs.
  • Continuous schema evolution – Simple DDL commands often render the data unusable. For instance, say a data engineer renames a column and writes some data. The consuming analytics tool now can’t read it because the metastore can’t track former names for columns. That rename operation has effectively dropped a column and added a new column. Now there is data written in both schemas. Historically, schema changes required expensive backfills and redundant ETL processes.
  • Different query patterns on the same data – If you change the partitioning to optimize your query after a year, say from daily to hourly, you have to rewrite the table with the new hour column as the partition. In addition, you have to rewrite queries to use the new partition column in your table.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Existing tools that support these features lock you into specific file formats, complicating interoperability across the analytics ecosystem.
  • Support for mixed file formats – With existing solutions, if you rename a column in one file format (say Parquet, ORC, or Avro), you get a different behavior than if you rename a column in a different file format. There is inconsistency in data types supported by different file formats. These limitations necessitate additional ETL steps.

The problem

When multiple users share the same data, varied requirements ensue. The data platform needs to be transactional to handle concurrent upserts and reads.

Table formats such as Hive track a list of partitions inside the table within a data catalog. However, the underlying files are still not tracked transactionally, because we’re relying on an immutable object storage that is just not designed to be transactional. After the specific partitions to be updated or inserted have been identified, we still need to list all the files in those partitions at the leaf level of the partition hierarchy before we can filter out which of those files are relevant. For huge analytic datasets with thousands of files in each partition, listing all those files each time you run a query slows it down considerably. Furthermore, doing atomic commits—getting thousands of files in the table live in exactly the same moment—becomes impractical.

Apache Iceberg on Amazon EMR

Iceberg development was started by Netflix in December 2017 and was donated to the Apache software foundation in November 2018 as an incubator project. In May 2020, it graduated from the incubator.

Iceberg on Amazon EMR comes completely integrated and tested for running in production backed by Enterprise Support. This means you get 24/7 technical support from Amazon EMR experts, tools and technology to automatically manage the health of your environment, and consultative architectural, performance, and troubleshooting guidance on Iceberg issues.

Iceberg has integrations with other AWS services. For example, you can use the AWS Glue Data Catalog as the metastore for Iceberg tables. Iceberg also supports other catalog types such as Hive, Hadoop, Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and other custom implementations. When using AWS Glue as the data catalog, the AWS Glue database serves as your Iceberg namespace. Similarly, the AWS Glue table and AWS Glue TableVersion serve as the Iceberg table and table version, respectively. Your AWS Glue Data Catalog could be in the same or different account or even a different Region, making multi-account, multi-Region pipelines easily deployable. Amazon Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

How Iceberg addresses these challenges

Iceberg tracks individual data files in a table instead of simply maintaining a pointer to high-level table or partition locations. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time new datasets are ingested into this table, a new point-in-time snapshot gets created. At query time, there is no need to list a directory to find the files we need to work with, because the snapshot already has that information pre-populated during the write time. Because of this design, Iceberg solves the problems listed earlier in the following ways:

  • Consistent reads and writes across multiple concurrent users – Iceberg relies on optimistic concurrency to support concurrent reads and writes from multiple user groups. If two operations are running at the same time, only one of them will be successful. The other job will retry, but that retry will be implicit to the user and that will be done at the metadata level. If Iceberg detects that the second update is not in conflict, it will commit it successfully.
  • Consistent table updates across multiple partitions – In Iceberg, the partition of a file isn’t determined by the physical location of the files within directories or prefixes. Instead, Iceberg stores partition information within manifests of the data files. Therefore, updates across multiple partitions entail a simple, atomic metadata change.
  • Continuous schema evolution – Iceberg tracks columns by using unique IDs and not by the column name, which enables easy schema evolution. You can safely add, drop, rename, or even reorder columns. You can also update column data types if the update is safe (such as widening from INT to BIGINT or float to double)
  • Different query patterns on the same data – Iceberg keeps track of the relationship between partitioning values and the column that they came from. Logical data is decoupled from physical layout, which enables easy partition evolution as well. Partition values can be implicitly derived using a transform such as day(timestamp) or hour(timestamp) of an existing column.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Iceberg supports ACID transactions with serializable isolation. Furthermore, Iceberg supports deletes, upserts, change data capture (CDC), time travel (getting the state of the data from a past time regardless of the current state of the data), and compaction (consolidating small files into larger files to reduce metadata overhead and improve query speed). Table changes are atomic, and readers never see partial or uncommitted changes.
  • Support for mixed file formats – Because schema fields are tracked by unique IDs independent of the underlying file format, you can have consistent queries across file formats such as Avro, Parquet, and ORC.

Using Apache Iceberg with Amazon EMR

In this post, we demonstrate creating an Amazon EMR cluster that supports Iceberg using the AWS Command Line Interface (AWS CLI). You can also create the cluster from the Amazon EMR console. We use Amazon EMR Studio to run notebook code on our EMR cluster. To set up an EMR Studio, refer to Set up an EMR Studio. First, we note down the subnets that we specified when we created our EMR Studio. Now we launch our EMR cluster using the AWS CLI:

aws emr create-cluster \
--name iceberg-emr-cluster \
--use-default-roles \
--release-label emr-6.6.0 \
--instance-count 1 \
--instance-type r5.4xlarge \
--applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
--ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
--configurations '[{"Classification":"iceberg-
defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-
site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.met
astore.AWSGlueDataCatalogHiveClientFactory"}}]'

We choose emr-6.6.0 as the release label. This release comes with Iceberg version 0.13.1 pre-installed. We launch a single-node EMR cluster with the instance type R5.4xlarge and with the following applications installed: Hadoop, Spark, Livy, and Jupyter Enterprise Gateway. Make sure that you replace <EMR-STUDIO-SUBNET> with a subnet ID from the list of EMR Studio’s subnets you noted earlier. We need to enable Iceberg and the AWS Glue Data Catalog on our cluster. To do this, we use the following configuration classifications:

[
  {
    "Classification": "iceberg-defaults ",
    "Properties": {
      "iceberg.enabled":"true"
    }
  },
  {
    "Classification": "spark-hive-site ",
    "Properties": {
      "hive.metastore.client.factory.class":        
         "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]

Initial setup

Let’s first create an S3 bucket location in the same Region as the EMR cluster to save a sample dataset that we’re going to create and work with. In this post, we use the placeholder bucket name YOUR-BUCKET-NAME. Remember to replace this with a globally unique bucket name when testing this out in your environment. From our EMR Studio workspace, we attach our cluster and use the PySpark kernel.

You can upload the sample notebook from the GitHub repo or use the Iceberg example under Notebook Examples in your own EMR Studio workspace and run the cells following the instructions in the notebook.

Configure a Spark session

In this command, we set our AWS Glue Data Catalog name as glue_catalog1. You can replace it with a different name. But if you do so, remember to change the Data Catalog name throughout this example, because we use the fully qualified table name including the Data Catalog name in all of our commands going forward. In the following command, remember to replace YOUR-BUCKET-NAME with your own bucket name:

%%configure -f
{
    "conf":  {
             "spark.sql.catalog.glue_catalog1": "org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse": 
                   "s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":    "org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table": "myGlueLockTable",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}

Let’s assume that the name of your catalog is glue_catalog1. The preceding code has the following components:

  • glue_catalog1.warehouse points to the Amazon S3 path where you want to store your data and metadata.
  • To make the catalog an AWS Glue Data Catalog, set glue_catalog1.catalog-impl to org.apache.iceberg.aws.glue.GlueCatalog. This key is required to point to an implementation class for any custom catalog implementation.
  • Use org.apache.iceberg.aws.s3.S3FileIO as the glue_catalog1.io-impl in order to take advantage of Amazon S3 multipart upload for high parallelism.
  • We use an Amazon DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that, we set lock-impl for our catalog to org.apache.iceberg.aws.glue.DynamoLockManager and we set lock.table to myGlueLockTable as the table name so that for every commit, the Data Catalog first obtains a lock using this table and then tries to safely modify the AWS Glue table. If you choose this option, the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Furthermore, additional DynamoDB charges apply.

Now that you’re all set with your EMR cluster for compute, S3 bucket for data, and AWS Glue Data Catalog for metadata, you can start creating a table and running the DML statements.

For all commands going forward, we use the %%sql cell magic to run Spark SQL commands in our EMR Studio notebook. However, for brevity, we don’t show the cell magic command. But you may need to use that in your Studio notebook for the SQL commands to work.

Create an Iceberg table in the AWS Glue Data Catalog

The default catalog is the AwsDataCatalog. Let’s switch to our AWS Glue catalog glue_catalog1, which has support for Iceberg tables. There are no namespaces as yet. A namespace in Iceberg is the same thing as a database in AWS Glue.

%%sql
use glue_catalog1

Let’s create a table called orders. The DDL syntax looks the same as creating a Hive table, for example, except that we include USING iceberg:

CREATE TABLE glue_catalog1.salesdb.orders
    (
      order_id              int,
      product_name          string,
      product_category      string,
      qty                   int,
      unit_price            decimal(7,2),
      order_datetime        timestamp
    )
USING iceberg
PARTITIONED BY (days(order_datetime))

Note that we’re also partitioning this table by extracting the day out of the order_datetime column. We don’t have to create a separate column for the partition.

DML statements

We then insert records to our table. Here is an example:

INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        1, 
        'Harry Potter and the Prisoner of Azkaban',
        'Books',
        2,
        7.99,
        current_timestamp()
    )

DML statements result in snapshots getting created. Note the snapshot_id and the timestamp column called committed_at:

SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

We now insert four more records and then query the orders table and confirm that the five records are present:

SELECT * FROM glue_catalog1.salesdb.orders

Querying from Athena

Because Iceberg on Amazon EMR comes pre-integrated with the AWS Glue Data Catalog, we can now query the Iceberg tables from AWS analytics services that support Iceberg. Let’s query the salesdb/orders table from Athena as shown in the following screenshot.

Upserts

The notebook then gives examples for updates and deletes, and even upserts. We use the MERGE INTO statement for upserts, which uses the source table orders_update with new and updated records:

MERGE INTO glue_catalog1.salesdb.orders target 
USING glue_catalog1.salesdb.orders_update source          
ON target.order_id = source.order_id              
WHEN MATCHED THEN 
    UPDATE SET
        order_id = source.order_id,
        product_name = source.product_name,
        product_category = source.product_category,
        qty = source.qty,
        unit_price = source.unit_price,
        order_datetime = source.order_datetime
WHEN NOT MATCHED THEN
    INSERT *
select * from glue_catalog1.salesdb.orders;

Schema evolution

We then walk through schema evolution using simple ALTER TABLE commands to add, rename, and drop columns. The following example how simple it is to rename a column:

ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity
DESC table glue_catalog1.salesdb.orders

Time travel

Iceberg also allows us to travel backward or forward by storing point-in-time snapshots. We can travel using timestamps when the snapshots were created or directly using the snapshot_id. The following is an example of a CALL statement that uses rollback_to_snapshot:

%%sql
CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', 8008410363488501197)

We then travel forward in time by calling set_current_snapshot:

%%sql
CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', 8392090950225782953)

Partition evolution

The notebook ends with an example that shows how partition evolution works in Iceberg. Iceberg stores partition information as part of the metadata. Because there is no separate partition column in the data itself, changing the partitioning scheme to hourly partitions for example is just a matter of calling a different partition transform hours(…) on an existing column order_datetime as shown in the following example:

%%sql
ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)

You can continue to use the old partition on the old data. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately.

The notebook shows how you can query the table using the new hourly partition:

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=1

You can continue to query your old data using the day() transform. There is only the original order_datetime column in the table.

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=14

You don’t have to store additional columns to accommodate multiple partitioning schemes. The partition definitions are in the metadata, providing the flexibility to evolve and change the partition definitions in the future.

Conclusion

In this post, we introduced Apache Iceberg and explained how Iceberg solves some challenges in modern data lakes. We then walked you through how to run Iceberg on Amazon EMR using the AWS Glue Data Catalog as the metastore, and query the data using Athena. You can also run upserts on this data from Athena. There is no additional cost to using Iceberg with Amazon EMR.

For more information about Iceberg, refer to How Iceberg works. Iceberg on Amazon EMR, with its integration with AWS Analytics services, can simplify the way you process, upsert, and delete data, with full support for ACID transactions in Amazon S3. You can also implement schema evolution, partition evolution, time travel, and compaction of data.


About the Author

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, especially those focused on underprivileged Children’s education.

Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as Amazon Athena for interactive queries, Amazon EMR for Apache Spark applications, Amazon SageMaker for machine learning, and Amazon QuickSight for data visualization.

Apache Iceberg is an open-source table format for data stored in data lakes. It is optimized for data access patterns in Amazon Simple Storage Service (Amazon S3) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to process safe table schema updates as the table data evolves
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)
  • Provide versioned tables and support time travel queries to query historical data and verify changes between updates
  • Roll back tables to prior versions to return tables to a known good state in case of any issues

In 2021, AWS teams contributed the Apache Iceberg integration with the AWS Glue Data Catalog to open source, which enables you to use open-source compute engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg and Amazon EMR added support of Iceberg starting with version 6.5.0.

In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.

Solution overview

We use the Amazon Customer Reviews public dataset as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.

Architecture that shows the flow from Amazon EMR loading data into Amazon S3, and queried by Amazon Athena through AWS Glue Data Catalog.

To set up and test this solution, we complete the following high-level steps:

  1. Create an S3 bucket.
  2. Create an EMR cluster.
  3. Create an EMR notebook.
  4. Configure a Spark session.
  5. Load data into the Iceberg table.
  6. Query the data in Athena.
  7. Perform a row-level update in Athena.
  8. Perform a schema evolution in Athena.
  9. Perform time travel in Athena.
  10. Consume Iceberg data across Amazon EMR and Athena.

Prerequisites

To follow along with this walkthrough, you must have the following:

  • An AWS Account with a role that has sufficient access to provision the required resources.

Create an S3 bucket

To create an S3 bucket that holds your Iceberg data, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name (for this post, we enter aws-lake-house-iceberg-blog-demo).

Because S3 bucket names are globally unique, choose a different name when you create your bucket.

  1. For AWS Region, choose your preferred Region (for this post, we use us-east-1).

Create a new Amazon S3 bucket. Choose us-east-1 as region

  1. Complete the remaining steps to create your bucket.
  2. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.

Create an EMR cluster

Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose your Amazon EMR release version.

Iceberg requires release 6.5.0 and above.

  1. Select JupyterEnterpriseGateway and Spark as the software to install.
  2. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  3. Leave other settings at their default and choose Next.

Choose Amazon EMR release 6.6.0 and JupyterEnterpriseGateway and Spark. Enter configuration information.

  1. You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.
  2. Choose Next.
  3. For Cluster name, enter Iceberg Spark Cluster.
  4. Leave the remaining settings unchanged and choose Next.

Provide Iceberg Spark Cluster as the Cluster name

  1. You can configure security settings such as adding an EC2 key pair to access your EMR cluster locally. In this demo, we use the default setting.
  2. Choose Create cluster.

You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from Starting to Waiting.

Create an EMR notebook

When the cluster is active and in the Waiting state, we’re ready to run Spark programs in the cluster. For this demo, we use an EMR notebook to run Spark commands.

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. For Notebook name, enter a name (for this post, we enter iceberg-spark-notebook).
  4. For Cluster, select Choose an existing cluster and choose Iceberg Spark Cluster.
  5. For AWS service role, choose Create a new role to create EMR_Notebook_DefaultRole or choose a different role to access resources in the notebook.
  6. Choose Create notebook.

Create an Amazon EMR notebook. Use EMR_Notebooks_DefaultRole

You’re redirected to the notebook detail page.

  1. Choose Open in JupyterLab next to your notebook.
  2. Choose to create a new notebook.
  3. Under Notebook, choose Spark.

Choose Spark from the options provided in the Launcher

Configure a Spark session

In your notebook, run the following code:

%%configure -f
{
  "conf": {
    "spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.demo.warehouse": "s3://<your-iceberg-blog-demo-bucket>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
  }
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path s3://<your-iceberg-blog-demo-bucket>
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)

Load data into the Iceberg table

In our Spark session, run the following commands to load data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews related to books
val book_reviews_location = "s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet"
val book_reviews = spark.read.parquet(book_reviews_location)

// write book reviews data to an Iceberg v2 table
book_reviews.writeTo("demo.reviews.book_reviews").tableProperty("format-version", "2").createOrReplace()

Iceberg format v2 is needed to support row-level updates and deletes. See Format Versioning for more details.

It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the reviews database, with the table_type property shown as ICEBERG.

Shows the table properties for book_reviews table

The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using Spark SQL, Athena SQL, or Iceberg Java and Python SDKs.

Query in Athena

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure to use the S3 bucket you created earlier to store the query results.

The table book_reviews is available for querying. Run the following query:

SELECT * FROM reviews.book_reviews LIMIT 5;

The following screenshot shows the first five records from the table being displayed.

Amazon Athena query the first 5 rows and show the results

Perform a row-level update in Athena

In the next few steps, let’s focus on a record in the table with review ID RZDVOUQG1GBG7. Currently, it has no total votes when we run the following query:

SELECT total_votes FROM reviews.book_reviews 
WHERE review_id = 'RZDVOUQG1GBG7'

Query total_votes for a particular review which shows a value of 0

Let’s update the total_votes value to 2 using the following query:

UPDATE reviews.book_reviews
SET total_votes = 2
WHERE review_id = 'RZDVOUQG1GBG7'

Update query to set the total_votes for the previous review_id to 2

After your update command runs successfully, run the below query and note the updated result showing a total of two votes:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.

Concurrent updates causes a failure. This shows the TRANSACTION_CONFLICT error during this scenario.

Delete queries work in a similar way; see DELETE for more details.

Perform a schema evolution in Athena

Suppose the review suddenly goes viral and gets 10 billion votes:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Based on the AWS Glue table information, the total_votes is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.

Updating to a very large value greater than maximum allowed integer value results in an error

Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column total_votes to a BIGINT type by running the following DDL:

ALTER TABLE reviews.book_reviews
CHANGE COLUMN total_votes total_votes BIGINT;

You can now update the value successfully:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Querying the record now gives us the expected result in BIGINT:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Perform time travel in Athena

In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Query an earlier snapshot using time travel feature

Consume Iceberg data across Amazon EMR and Athena

One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.

First, run the same Spark SQL and see if you get the same result for the review used in the example:

val select_votes = """SELECT total_votes FROM demo.reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'"""

spark.sql(select_votes).show()

Spark shows 10 billion total votes for the review.

Shows the latest value of total_votes when querying using the Amazon EMR notebook

Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:

val select_history = "SELECT * FROM demo.reviews.book_reviews.history"

spark.sql(select_history).show()

This shows three transactions corresponding to the two updates you ran in Athena.

Shows snapshots corresponding to the two updates you ran in Athena

Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an expire_snapshots procedure to remove old snapshots, and free up storage space in Amazon S3:

import java.util.Calendar
import java.text.SimpleDateFormat

val now = Calendar.getInstance().getTime()
val form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val now_formatted = form.format(now.getTime())
val procedure = s"""CALL demo.system.expire_snapshots(
  table => 'reviews.book_reviews',
  older_than => TIMESTAMP '$now_formatted',
  retain_last => 1)"""

spark.sql(procedure)

Note that, after running this procedure, time travel can no longer be performed against expired snapshots.

Examine the history system table again and notice that it shows you only the most recent snapshot.

Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following code in your notebook to drop the AWS Glue table and database:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.book_reviews") 
// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook iceberg-spark-notebook and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster Iceberg Spark Cluster and choose Terminate.
  5. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.

With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as SageMaker Athena integration for machine learning, or QuickSight Athena integration for dashboard and reporting. AWS Glue also offers the Iceberg connector, which you can use to author and run Iceberg data pipelines.

In addition, Iceberg supports a variety of other open-source compute engines that you can choose from. For example, you can use Apache Flink on Amazon EMR for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.


About the Authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Mohit Mehta is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Jared Keating is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.