Tag Archives: Amazon EMR

Use Amazon EMR with S3 Access Grants to scale Spark access to Amazon S3

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/use-amazon-emr-with-s3-access-grants-to-scale-spark-access-to-amazon-s3/

Amazon EMR is pleased to announce integration with Amazon Simple Storage Service (Amazon S3) Access Grants that simplifies Amazon S3 permission management and allows you to enforce granular access at scale. With this integration, you can scale job-based Amazon S3 access for Apache Spark jobs across all Amazon EMR deployment options and enforce granular Amazon S3 access for better security posture.

In this post, we’ll walk through a few different scenarios of how to use Amazon S3 Access Grants. Before we get started on walking through the Amazon EMR and Amazon S3 Access Grants integration, we’ll set up and configure S3 Access Grants. Then, we’ll use the AWS CloudFormation template below to create an Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) Cluster, an EMR Serverless application and two different job roles.

After the setup, we’ll run a few scenarios of how you can use Amazon EMR with S3 Access Grants. First, we’ll run a batch job on EMR on Amazon EC2 to import CSV data and convert to Parquet. Second, we’ll use Amazon EMR Studio with an interactive EMR Serverless application to analyze the data. Finally, we’ll show how to set up cross-account access for Amazon S3 Access Grants. Many customers use different accounts across their organization and even outside their organization to share data. Amazon S3 Access Grants make it easy to grant cross-account access to your data even when filtering by different prefixes.

Besides this post, you can learn more about Amazon S3 Access Grants from Scaling data access with Amazon S3 Access Grants.

Prerequisites

Before you launch the AWS CloudFormation stack, ensure you have the following:

  • An AWS account that provides access to AWS services
  • The latest version of the AWS Command Line Interface (AWS CLI)
  • An AWS Identity and Access Management (AWS IAM) user with an access key and secret key to configure the AWS CLI, and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation
  • A second AWS account if you wish to test the cross-account functionality

Walkthrough

Create resources with AWS CloudFormation

In order to use Amazon S3 Access Grants, you’ll need a cluster with Amazon EMR 6.15.0 or later. For more information, see the documentation for using Amazon S3 Access Grants with an Amazon EMR cluster, an Amazon EMR on EKS cluster, and an Amazon EMR Serverless application. For the purpose of this post, we’ll assume that you have two different types of data access users in your organization—analytics engineers with read and write access to the data in the bucket and business analysts with read-only access. We’ll utilize two different AWS IAM roles, but you can also connect your own identity provider directly to IAM Identity Center if you like.

Here’s the architecture for this first portion. The AWS CloudFormation stack creates the following AWS resources:

  • A Virtual Private Cloud (VPC) stack with private and public subnets to use with EMR Studio, route tables, and Network Address Translation (NAT) gateway.
  • An Amazon S3 bucket for EMR artifacts like log files, Spark code, and Jupyter notebooks.
  • An Amazon S3 bucket with sample data to use with S3 Access Grants.
  • An Amazon EMR cluster configured to use runtime roles and S3 Access Grants.
  • An Amazon EMR Serverless application configured to use S3 Access Grants.
  • An Amazon EMR Studio where users can login and create workspace notebooks with the EMR Serverless application.
  • Two AWS IAM roles we’ll use for our EMR job runs: one for Amazon EC2 with write access and another for Serverless with read access.
  • One AWS IAM role that will be used by S3 Access Grants to access bucket data (i.e., the Role to use when registering a location with S3 Access Grants. S3 Access Grants use this role to create temporary credentials).

To get started, complete the following steps:

  1. Choose Launch Stack:
  2. Accept the defaults and select I acknowledge that this template may create IAM resources.

The AWS CloudFormation stack takes approximately 10–15 minutes to complete. Once the stack is finished, go to the outputs tab where you will find information necessary for the following steps.

Create Amazon S3 Access Grants resources

First, we’re going to create an Amazon S3 Access Grants resources in our account. We create an S3 Access Grants instance, an S3 Access Grants location that refers to our data bucket created by the AWS CloudFormation stack that is only accessible by our data bucket AWS IAM role, and grant different levels of access to our reader and writer roles.

To create the necessary S3 Access Grants resources, use the following AWS CLI commands as an administrative user and replace any of the fields between the arrows with the output from your CloudFormation stack.

aws s3control create-access-grants-instance \
  --account-id <YOUR_ACCOUNT_ID>

Next, we create a new S3 Access Grants location. What is a Location? Amazon S3 Access Grants works by vending AWS IAM credentials with access scoped to a particular S3 prefix. An S3 Access Grants location will be associated with an AWS IAM Role from which these temporary sessions will be created.

In our case, we’re going to scope the AWS IAM Role to the bucket created with our AWS CloudFormation stack and give access to the data bucket role created by the stack. Go to the outputs tab to find the values to replace with the following code snippet:

aws s3control create-access-grants-location \
  --account-id <YOUR_ACCOUNT_ID> \
  --location-scope "s3://<DATA_BUCKET>/" \
  --iam-role-arn <DATA_BUCKET_ROLE>

Note the AccessGrantsLocationId value in the response. We’ll need that for the next steps where we’ll walk through creating the necessary S3 Access Grants to limit read and write access to your bucket.

  • For the read/write user, use s3-control create-access-grant to allow READWRITE access to the “output/*” prefix:
    aws s3control create-access-grant \
      --account-id <YOUR_ACCOUNT_ID> \
      --access-grants-location-id <LOCATION_ID_FROM_PREVIOUS_COMMAND> \
      --access-grants-location-configuration S3SubPrefix="output/*" \
      --permission READWRITE \
      --grantee GranteeType=IAM,GranteeIdentifier=<DATA_WRITER_ROLE>

  • For the read user, use s3control create-access-grant again to allow only READ access to the same prefix:
    aws s3control create-access-grant \
      --account-id <YOUR_ACCOUNT_ID> \
      --access-grants-location-id <LOCATION_ID_FROM_PREVIOUS_COMMAND> \
      --access-grants-location-configuration S3SubPrefix="output/*" \
      --permission READ \
      --grantee GranteeType=IAM,GranteeIdentifier=<DATA_READER_ROLE>

Demo Scenario 1: Amazon EMR on EC2 Spark Job to generate Parquet data

Now that we’ve got our Amazon EMR environments set up and granted access to our roles via S3 Access Grants, it’s important to note that the two AWS IAM roles for our EMR cluster and EMR Serverless application have an IAM policy that only allow access to our EMR artifacts bucket. They have no IAM access to our S3 data bucket and instead use S3 Access Grants to fetch short-lived credentials scoped to the bucket and prefix. Specifically, the roles are granted s3:GetDataAccess and s3:GetDataAccessGrantsInstanceForPrefix permissions to request access via the specific S3 Access Grants instance created in our region. This allows you to easily manage your S3 access in one place in a highly scoped and granular fashion that enhances your security posture. By combining S3 Access Grants with job roles on EMR on Amazon Elastic Kubernetes Service (Amazon EKS) and EMR Serverless as well as runtime roles for Amazon EMR steps beginning with EMR 6.7.0, you can easily manage access control for individual jobs or queries. S3 Access Grants are available on EMR 6.15.0 and later. Let’s first run a Spark job on EMR on EC2 as our analytics engineer to convert some sample data into Parquet.

For this, use the sample code provided in converter.py. Download the file and copy it to the EMR_ARTIFACTS_BUCKET created by the AWS CloudFormation stack. We’ll submit our job with the ReadWrite AWS IAM role. Note that for the EMR cluster, we configured S3 Access Grants to fall back to the IAM role if access is not provided by S3 Access Grants. The DATA_WRITER_ROLE has read access to the EMR artifacts bucket through an IAM policy so it can read our script. As before, replace all the values with the <> symbols from the Outputs tab of your CloudFormation stack.

aws s3 cp converter.py s3://<EMR_ARTIFACTS_BUCKET>/code/
aws emr add-steps --cluster-id <EMR_CLUSTER_ID> \
    --execution-role-arn <DATA_WRITER_ROLE> \
    --steps '[
        {
            "Type": "CUSTOM_JAR",
            "Name": "converter",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Args": [
                    "spark-submit",
                    "--deploy-mode",
                    "client",
                    "s3://<EMR_ARTIFACTS_BUCKET>/code/converter.py",
                    "s3://<DATA_BUCKET>/output/weather-data/"
            ]
        }
    ]'

Once the job finishes, we should see some Parquet data in s3://<DATA_BUCKET>/output/weather-data/. You can see the status of the job in the Steps tab of the EMR console.

Demo Scenario 2: EMR Studio with an interactive EMR Serverless application to analyze data

Now let’s go ahead and login to EMR Studio and connect to your EMR Serverless application with the ReadOnly runtime role to analyze the data from scenario 1. First we need to enable the interactive endpoint on your Serverless application.

  • Select the EMRStudioURL in the Outputs tab of your AWS CloudFormation stack.
  • Select Applications under the Serverless section on the left-hand side.
  • Select the EMRBlog application, then the Action dropdown, and Configure.
  • Expand the Interactive endpoint section and make sure that Enable interactive endpoint is checked.
  • Scroll down and click Configure application to save your changes.
  • Back on the Applications page, select EMRBlog application, then the Start application button.

Next, create a new workspace in our Studio.

  • Choose Workspaces on the left-hand side, then the Create workspace button.
  • Enter a Workspace name, leave the remaining defaults, and choose Create Workspace.
  • After creating the workspace, it should launch in a new tab in a few seconds.

Now connect your Workspace to your EMR Serverless application.

  • Select the EMR Compute button on the left-hand side as shown in the following code.
  • Choose EMR Serverless as the compute type.
  • Choose the EMRBlog application and the runtime role that starts with EMRBlog.
  • Choose Attach. The window will refresh and you can open a new PySpark notebook and follow along below. To execute the code yourself, download the AccessGrantsReadOnly.ipynb notebook and upload it into your workspace using the Upload Files button in the file browser.

Let’s do a quick read of the data.

df = spark.read.parquet(f"s3://{DATA_BUCKET}/output/weather-data/")
df.createOrReplaceTempView("weather")
df.show()

We’ll do a simple count(*):

spark.sql("SELECT year, COUNT(*) FROM weather GROUP BY 1").show()


You can also see that if we try to write data into the output location, we get an Amazon S3 error.

df.write.format("csv").mode("overwrite").save("s3://<DATA_BUCKET>/output/weather-data-2/")

While you can also grant similar access via AWS IAM policies, Amazon S3 Access Grants can be useful for situations where your organization has outgrown managing access via IAM, wants to map S3 Access Grants to IAM Identity Center principals or roles, or has previously used EMR File System (EMRFS) Role Mappings. S3 Access Grants credentials are also temporary providing more secure access to your data. In addition, as shown below, cross-account access also benefits from the simplicity of S3 Access Grants.

Demo Scenario 3 – Cross-account access

One of the other more common access patterns is accessing data across accounts. This pattern has become increasingly common with the emergence of data mesh, where data producers and consumers are decentralized across different AWS accounts.

Previously, cross-account access required setting up complex cross-account assume role actions and custom credentials providers when configuring your Spark job. With S3 Access Grants, we only need to do the following:

  • Create an Amazon EMR job role and cluster in a second data consumer account
  • The data producer account grants access to the data consumer account with a new instance resource policy
  • The data producer account creates an access grant for the data consumer job role

And that’s it! If you have a second account handy, go ahead and deploy this AWS CloudFormation stack in the data consumer account, to create a new EMR Serverless application and job role. If not, just follow along below. The AWS CloudFormation stack should finish creating in under a minute. Next, let’s go ahead and grant our data consumer access to the S3 Access Grants instance in our data producer account.

  • Replace <DATA_PRODUCER_ACCOUNT_ID> and <DATA_CONSUMER_ACCOUNT_ID> with the relevant 12-digit AWS account IDs.
  • You may also need to change the region in the command and policy.
    aws s3control put-access-grants-instance-resource-policy \
        --account-id <DATA_PRODUCER_ACCOUNT_ID> \
        --region us-east-2 \
        --policy '{
        "Version": "2012-10-17",
        "Id": "S3AccessGrantsPolicy",
        "Statement": [
            {
                "Sid": "AllowAccessToS3AccessGrants",
                "Principal": {
                    "AWS": "<DATA_CONSUMER_ACCOUNT_ID>"
                },
                "Effect": "Allow",
                "Action": [
                    "s3:ListAccessGrants",
                    "s3:ListAccessGrantsLocations",
                    "s3:GetDataAccess"
                ],
                "Resource": "arn:aws:s3:us-east-2:<DATA_PRODUCER_ACCOUNT_ID>:access-grants/default"
            }
        ]
    }'

  • And then grant READ access to the output folder to our EMR Serverless job role in the data consumer account.
    aws s3control create-access-grant \
        --account-id <DATA_PRODUCER_ACCOUNT_ID> \
        --region us-east-2 \
        --access-grants-location-id default \
        --access-grants-location-configuration S3SubPrefix="output/*" \
        --permission READ \
        --grantee GranteeType=IAM,GranteeIdentifier=arn:aws:iam::<DATA_CONSUMER_ACCOUNT_ID>:role/<EMR_SERVERLESS_JOB_ROLE> \
        --region us-east-2

Now that we’ve done that, we can read data in the data consumer account from the bucket in the data producer account. We’ll just run a simple COUNT(*) again. Replace the <APPLICATION_ID>, <DATA_CONSUMER_JOB_ROLE>, and <DATA_CONSUMER_LOG_BUCKET> with the values from the Outputs tab on the AWS CloudFormation stack created in your second account.

And replace <DATA_PRODUCER_BUCKET> with the bucket from your first account.

aws emr-serverless start-job-run \
  --application-id <APPLICATION_ID> \
  --execution-role-arn <DATA_CONSUMER_JOB_ROLE> \
  --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://<DATA_CONSUMER_LOG_BUCKET>/logs/"
            }
        }
    }' \
  --job-driver '{
    "sparkSubmit": {
        "entryPoint": "SELECT COUNT(*) FROM parquet.`s3://<DATA_PRODUCER_BUCKET>/output/weather-data/`",
        "sparkSubmitParameters": "--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver -e"
    }
  }'

Wait for the job to reach a completed state, and then fetch the stdout log from your bucket, replacing the <APPLICATION_ID>, <JOB_RUN_ID> from the job above, and <DATA_CONSUMER_LOG_BUCKET>.

aws emr-serverless get-job-run --application-id <APPLICATION_ID> --job-run-id <JOB_RUN_ID>
{
    "jobRun": {
        "applicationId": "00feq2s6g89r2n0d",
        "jobRunId": "00feqnp2ih45d80e",
        "state": "SUCCESS",
        ...
}

If you are on a unix-based machine and have gunzip installed, then you can use the following command as your administrative user.

Note that this command only uses AWS IAM Role Policies, not Amazon S3 Access Grants.

aws s3 ls s3:// <DATA_CONSUMER_LOG_BUCKET>/logs/applications/<APPLICATION_ID>/jobs/<JOB_RUN_ID>/SPARK_DRIVER/stdout.gz - | gunzip

Otherwise, you can use the get-dashboard-for-job-run command and open the resulting URL in your browser to view the Driver stdout logs in the Executors tab of the Spark UI.

aws emr-serverless get-dashboard-for-job-run --application-id <APPLICATION_ID> --job-run-id <JOB_RUN_ID>

Cleaning up

In order to avoid incurring future costs for examples resources in your AWS accounts, be sure to take the following steps:

  • You must manually delete the Amazon EMR Studio workspace created in the first part of the post
  • Empty the Amazon S3 buckets created by the AWS CloudFormation stacks
  • Make sure you delete the Amazon S3 Access Grants, resource policies, and S3 Access Grants location created in the steps above using the delete-access-grant, delete-access-grants-instance-resource-policy, delete-access-grants-location, and delete-access-grants-instance commands.
  • Delete the AWS CloudFormation Stacks created in each account

Comparison to AWS IAM Role Mapping

In 2018, EMR introduced EMRFS role mapping as a way to provide storage-level authorization by configuring EMRFS with multiple IAM roles. While effective, role mapping required managing users or groups locally on your EMR cluster in addition to maintaining the mappings between those identities and their corresponding IAM roles. In combination with runtime roles on EMR on EC2 and job roles for EMR on EKS and EMR Serverless, it is now easier to grant access to your data on S3 directly to the relevant principal on a per-job basis.

Conclusion

In this post, we showed you how to set up and use Amazon S3 Access Grants with Amazon EMR in order to easily manage data access for your Amazon EMR workloads. With S3 Access Grants and EMR, you can easily configure access to data on S3 for IAM identities or using your corporate directory in IAM Identity Center as your identity source. S3 Access Grants is supported across EMR on EC2, EMR on EKS, and EMR Serverless starting in EMR release 6.15.0.

To learn more, see the S3 Access Grants and EMR documentation and feel free to ask any questions in the comments!


About the author

Damon Cortesi is a Principal Developer Advocate with Amazon Web Services. He builds tools and content to help make the lives of data engineers easier. When not hard at work, he still builds data pipelines and splits logs in his spare time.

Use generative AI with Amazon EMR, Amazon Bedrock, and English SDK for Apache Spark to unlock insights

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/use-generative-ai-with-amazon-emr-amazon-bedrock-and-english-sdk-for-apache-spark-to-unlock-insights/

In this era of big data, organizations worldwide are constantly searching for innovative ways to extract value and insights from their vast datasets. Apache Spark offers the scalability and speed needed to process large amounts of data efficiently.

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR is the best place to run Apache Spark. You can quickly and effortlessly create managed Spark clusters from the AWS Management Console, AWS Command Line Interface (AWS CLI), or Amazon EMR API. You can also use additional Amazon EMR features, including fast Amazon Simple Storage Service (Amazon S3) connectivity using the Amazon EMR File System (EMRFS), integration with the Amazon EC2 Spot market and the AWS Glue Data Catalog, and EMR Managed Scaling to add or remove instances from your cluster. Amazon EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks, and tools like Spark UI and YARN Timeline Service to simplify debugging.

To unlock the potential hidden within the data troves, it’s essential to go beyond traditional analytics. Enter generative AI, a cutting-edge technology that combines ML with creativity to generate human-like text, art, and even code. Amazon Bedrock is the most straightforward way to build and scale generative AI applications with foundation models (FMs). Amazon Bedrock is a fully managed service that makes FMs from Amazon and leading AI companies available through an API, so you can quickly experiment with a variety of FMs in the playground, and use a single API for inference regardless of the models you choose, giving you the flexibility to use FMs from different providers and keep up to date with the latest model versions with minimal code changes.

In this post, we explore how you can supercharge your data analytics with generative AI using Amazon EMR, Amazon Bedrock, and the pyspark-ai library. The pyspark-ai library is an English SDK for Apache Spark. It takes instructions in English language and compiles them into PySpark objects like DataFrames. This makes it straightforward to work with Spark, allowing you to focus on extracting value from your data.

Solution overview

The following diagram illustrates the architecture for using generative AI with Amazon EMR and Amazon Bedrock.

Solution Overview

EMR Studio is a web-based IDE for fully managed Jupyter notebooks that run on EMR clusters. We interact with EMR Studio Workspaces connected to a running EMR cluster and run the notebook provided as part of this post. We use the New York City Taxi data to garner insights into various taxi rides taken by users. We ask the questions in natural language on top of the data loaded in Spark DataFrame. The pyspark-ai library then uses the Amazon Titan Text FM from Amazon Bedrock to create a SQL query based on the natural language question. The pyspark-ai library takes the SQL query, runs it using Spark SQL, and provides results back to the user.

In this solution, you can create and configure the required resources in your AWS account with an AWS CloudFormation template. The template creates the AWS Glue database and tables, S3 bucket, VPC, and other AWS Identity and Access Management (IAM) resources that are used in the solution.

The template is designed to demonstrate how to use EMR Studio with the pyspark-ai package and Amazon Bedrock, and is not intended for production use without modification. Additionally, the template uses the us-east-1 Region and may not work in other Regions without modification. The template creates resources that incur costs while they are in use. Follow the cleanup steps at the end of this post to delete the resources and avoid unnecessary charges.

Prerequisites

Before you launch the CloudFormation stack, ensure you have the following:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS CLI, and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation
  • The Titan Text G1 – Express model is currently in preview, so you need to have preview access to use it as part of this post

Create resources with AWS CloudFormation

The CloudFormation creates the following AWS resources:

  • A VPC stack with private and public subnets to use with EMR Studio, route tables, and NAT gateway.
  • An EMR cluster with Python 3.9 installed. We are using a bootstrap action to install Python 3.9 and other relevant packages like pyspark-ai and Amazon Bedrock dependencies. (For more information, refer to the bootstrap script.)
  • An S3 bucket for the EMR Studio Workspace and notebook storage.
  • IAM roles and policies for EMR Studio setup, Amazon Bedrock access, and running notebooks

To get started, complete the following steps:

  1. Choose Launch Stack:
    Launch Button
  2. Select I acknowledge that this template may create IAM resources.

The CloudFormation stack takes approximately 20–30 minutes to complete. You can monitor its progress on the AWS CloudFormation console. When its status reads CREATE_COMPLETE, your AWS account will have the resources necessary to implement this solution.

Create EMR Studio

Now you can create an EMR Studio and Workspace to work with the notebook code. Complete the following steps:

  1. On the EMR Studio console, choose Create Studio.
  2. Enter the Studio Name as GenAI-EMR-Studio and provide a description.
  3. In the Networking and security section, specify the following:
    • For VPC, choose the VPC you created as part of the CloudFormation stack that you deployed. Get the VPC ID using the CloudFormation outputs for the VPCID key.
    • For Subnets, choose all four subnets.
    • For Security and access, select Custom security group.
    • For Cluster/endpoint security group, choose EMRSparkAI-Cluster-Endpoint-SG.
    • For Workspace security group, choose EMRSparkAI-Workspace-SG.VPC Networking and Security
  4. In the Studio service role section, specify the following:
    • For Authentication, select AWS Identity and Access Management (IAM).
    • For AWS IAM service role, choose EMRSparkAI-StudioServiceRole.
  5. In the Workspace storage section, browse and choose the S3 bucket for storage starting with emr-sparkai-<account-id>.
  6. Choose Create Studio.Create Studio
  7. When the EMR Studio is created, choose the link under Studio Access URL to access the Studio.
  8. When you’re in the Studio, choose Create workspace.
  9. Add emr-genai as the name for the Workspace and choose Create workspace.
  10. When the Workspace is created, choose its name to launch the Workspace (make sure you’ve disabled any pop-up blockers).

Big data analytics using Apache Spark with Amazon EMR and generative AI

Now that we have completed the required setup, we can start performing big data analytics using Apache Spark with Amazon EMR and generative AI.

As a first step, we load a notebook that has the required code and examples to work with the use case. We use NY Taxi dataset, which contains details about taxi rides.

  1. Download the notebook file NYTaxi.ipynb and upload it to your Workspace by choosing the upload icon.
  2. After the notebook is imported, open the notebook and choose PySpark as the kernel.

PySpark AI by default uses OpenAI’s ChatGPT4.0 as the LLM model, but you can also plug in models from Amazon Bedrock, Amazon SageMaker JumpStart, and other third-party models. For this post, we show how to integrate the Amazon Bedrock Titan model for SQL query generation and run it with Apache Spark in Amazon EMR.

  1. To get started with the notebook, you need to associate the Workspace to a compute layer. To do so, choose the Compute icon in the navigation pane and choose the EMR cluster created by the CloudFormation stack.
  2. Configure the Python parameters to use the updated Python 3.9 package with Amazon EMR:
    %%configure -f
    {
    "conf": {
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9",
    "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9"
    }
    }

  3. Import the necessary libraries:
    from pyspark_ai import SparkAI
    from pyspark.sql import SparkSession
    from langchain.chat_models import ChatOpenAI
    from langchain.llms.bedrock import Bedrock
    import boto3
    import os

  4. After the libraries are imported, you can define the LLM model from Amazon Bedrock. In this case, we use amazon.titan-text-express-v1. You need to enter the Region and Amazon Bedrock endpoint URL based on your preview access for the Titan Text G1 – Express model.
    boto3_bedrock = boto3.client('bedrock-runtime', '<region>', endpoint_url='<bedrock endpoint url>')
    llm = Bedrock(
    model_id="amazon.titan-text-express-v1",
    client=boto3_bedrock)

  5. Connect Spark AI to the Amazon Bedrock LLM model for SQL query generation based on questions in natural language:
    #Connecting Spark AI to the Bedrock Titan LLM
    spark_ai = SparkAI(llm = llm, verbose=False)
    spark_ai.activate()

Here, we have initialized Spark AI with verbose=False; you can also set verbose=True to see more details.

Now you can read the NYC Taxi data in a Spark DataFrame and use the power of generative AI in Spark.

  1. For example, you can ask the count of the number of records in the dataset:
    taxi_records.ai.transform("count the number of records in this dataset").show()

We get the following response:

> Entering new AgentExecutor chain...
Thought: I need to count the number of records in the table.
Action: query_validation
Action Input: SELECT count(*) FROM spark_ai_temp_view_ee3325
Observation: OK
Thought: I now know the final answer.
Final Answer: SELECT count(*) FROM spark_ai_temp_view_ee3325
> Finished chain.
+----------+
| count(1)|
+----------+
|2870781820|
+----------+

Spark AI internally uses LangChain and SQL chain, which hide the complexity from end-users working with queries in Spark.

The notebook has a few more example scenarios to explore the power of generative AI with Apache Spark and Amazon EMR.

Clean up

Empty the contents of the S3 bucket emr-sparkai-<account-id>, delete the EMR Studio Workspace created as part of this post, and then delete the CloudFormation stack that you deployed.

Conclusion

This post showed how you can supercharge your big data analytics with the help of Apache Spark with Amazon EMR and Amazon Bedrock. The PySpark AI package allows you to derive meaningful insights from your data. It helps reduce development and analysis time, reducing time to write manual queries and allowing you to focus on your business use case.


About the Authors

Saurabh Bhutyani is a Principal Analytics Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running generative AI use cases, scalable analytics solutions and data mesh architectures using AWS services like Amazon Bedrock, Amazon SageMaker, Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Harsh Vardhan is an AWS Senior Solutions Architect, specializing in analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Implement fine-grained access control in Amazon SageMaker Studio and Amazon EMR using Apache Ranger and Microsoft Active Directory

Post Syndicated from Rahul Sarda original https://aws.amazon.com/blogs/big-data/implement-fine-grained-access-control-in-amazon-sagemaker-studio-and-amazon-emr-using-apache-ranger-and-microsoft-active-directory/

Amazon SageMaker Studio is a fully integrated development environment (IDE) for machine learning (ML) that enables data scientists and developers to perform every step of the ML workflow, from preparing data to building, training, tuning, and deploying models. SageMaker Studio comes with built-in integration with Amazon EMR, enabling data scientists to interactively prepare data at petabyte scale using frameworks such as Apache Spark, Hive, and Presto right from SageMaker Studio notebooks. With Amazon SageMaker, developers, data scientists, and SageMaker Studio users can access both raw data stored in Amazon Simple Storage Service (Amazon S3), and cataloged tabular data stored in a Hive metastore easily. SageMaker Studio’s support for Apache Ranger creates a simple mechanism for applying fine-grained access control to the raw and cataloged data with grant and revoke policies administered from a friendly web interface.

In this post, we show how you can authenticate into SageMaker Studio using an existing Active Directory (AD), with authorized access to both Amazon S3 and Hive cataloged data using AD entitlements via Apache Ranger integration and AWS IAM Identity Center (successor to AWS Single Sign-On). With this solution, you can manage access to multiple SageMaker environments and SageMaker Studio notebooks using a single set of credentials. Subsequently, Apache Spark jobs created from SageMaker Studio notebooks will access only the data and resources permitted by Apache Ranger policies attached to the AD credentials, inclusive of table and column-level access.

With this capability, multiple SageMaker Studio users can connect to the same EMR cluster, gaining access only to data granted to their user or group, with audit records captured and visible in Amazon CloudWatch. This multi-tenant environment is possible through user session isolation that prevents users from accessing datasets and cluster resources allocated to other users. Ultimately, organizations can provision fewer clusters, reduce administrative overhead, and increase cluster utilization, saving staff time and cloud costs.

Solution overview

We demonstrate this solution with an end-to-end use case using a sample ecommerce dataset. The dataset is available within provided AWS CloudFormation templates and consists of transactional ecommerce data (products, orders, customers) cataloged in a Hive metastore.

The solution utilizes two data analyst personas, Alex and Tina, each tasked with different analysis requiring fine-grained limitations on dataset access:

  • Tina, a data scientist on the marketing team, is tasked with building a model for customer lifetime value. Data access should only be permitted to non-sensitive customer, product, and orders data.
  • Alex, a data scientist on the sales team, is tasked to generate product demand forecast, requiring access to product and orders data. No customer data is required.

The following figure illustrates our desired fine-grained access.

The following diagram illustrates the solution architecture.

The architecture is implemented as follows:

  • Microsoft Active Directory – Used to manage user authentication, select AWS application access, and user and group membership for Apache Ranger secured data authorization
  • Apache Ranger – Used to monitor and manage comprehensive data security across the Hadoop and Amazon EMR platform
  • Amazon EMR – Used to retrieve, prepare, and analyze data from the Hive metastore using Spark
  • SageMaker Studio – An integrated IDE with purpose-built tools to build AI/ML models.

The following sections walk through the setup of the architectural components for this solution using the CloudFormation stack.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Create resources with AWS CloudFormation

To build the solution within your environment, use the provided CloudFormation templates to create the required AWS resources.

Note that running these CloudFormation templates and the following configuration steps will create AWS resources that may incur charges. Additionally, all the steps should be run in the same Region.

Template 1

This first template creates the following resources and takes approximately 15 minutes to complete:

  • A Multi-AZ, multi-subnet VPC infrastructure, with managed NAT gateways in the public subnet for each Availability Zone
  • S3 VPC endpoints and Elastic Network Interfaces
  • A Windows Active Directory domain controller using Amazon Elastic Compute Cloud (Amazon EC2) with cross-realm trust
  • A Linux Bastion host (Amazon EC2) in an auto scaling group

To deploy this template, complete the following steps:

  1. Sign in to the AWS Management Console.
  2. On the Amazon EC2 console, create an EC2 key pair.
  3. Choose Launch Stack :
  4. Select the target Region
  5. Verify the stack name and provide the following parameters:
    1. The name of the key pair you created.
    2. Passwords for cross-realm trust, the Windows domain admin, LDAP bind, and default AD user. Be sure to record these passwords to use in future steps.
    3. Select a minimum of three Availability Zones based on the chosen Region.
  6. Review the remaining parameters. No changes are required for the solution, but you may change parameter values if desired.
  7. Choose Next and then choose Next again.
  8. Review the parameters.
  9. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND.
  10. Choose Submit.

Template 2

The second template creates the following resources and takes approximately 30–60 minutes to complete:

To deploy this template, complete the following steps:

  1. Choose Launch Stack :
  2. Select the target Region
  3. Verify the stack name and provide the following parameters:
    1. Key pair name (created earlier).
    2. LDAPHostPrivateIP address, which can be found in the output section of the Windows AD CloudFormation stack.
    3. Passwords for the Windows domain admin, cross-realm trust, AD domain user, and LDAP bind. Use the same passwords as you did for the first CloudFormation template.
    4. Passwords for the RDS for MySQL database and KDC admin. Record these passwords; they may be needed in future steps.
    5. Log directory for the EMR cluster.
    6. VPC (it contains the name of the CloudFormation stack)
    7. Subnet details (align the subnet name with the parameter name).
    8. Set AppsEMR to Hadoop, Spark, Hive, Livy, Hue, and Trino.
    9. Leave RangerAdminPassword as is.
  4. Review the remaining parameters. No changes are required beyond what is mentioned, but you may change parameter values if desired.
  5. Choose Next, then choose Next again.
  6. Review the parameters.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND.
  8. Choose Submit.

Integrate Active Directory with AWS accounts using IAM Identity Center

To enable users to sign in to SageMaker with Active Directory credentials, a connection between IAM Identity Center and Active Directory must be established.

To connect to Microsoft Active Directory, we set up AWS Directory Service using AD Connector.

  1. On the Directory Service console, choose Directories in the navigation pane.
  2. Choose Set up directory.
  3. For Directory types, select AD Connector.
  4. Choose Next.
  5. For Directory size, select the appropriate size for AD Connector. For this post, we select Small.
  6. Choose Next.
  7. Choose the VPC and private subnets where the Windows AD domain controller resides.
  8. Choose Next.
  9. In the Active Directory information section, enter the following details (this information can be retrieved on the Outputs tab of the first CloudFormation template):
    1. For Directory DNS Name, enter awsemr.com.
    2. For Directory NetBIOS name, enter awsemr.
    3. For DNS IP addresses, enter the IPv4 private IP address from AD Controller.
    4. Enter the service account user name and password that you provided during stack creation.
  10. Choose Next.
  11. Review the settings and choose Create directory.

After the directory is created, you will see its status as Active on the Directory Services console.

Set up AWS Organizations

AWS Organizations supports IAM Identity Center in only one Region at a time. To enable IAM Identity Center in this Region, you must first delete the IAM Identity Center configuration if created in another Region. Do not delete an existing IAM Identity Center configuration unless you are sure it will not negatively impact existing workloads.

  1. Navigate to the IAM Identity Center console.
  2. If IAM Identity Center has not been activated previously, choose Enable. If an organization does not exist, an alert appears to create one.
  3. Choose Create AWS organization.
  4. Choose Settings in the navigation pane.
  5. On the Identity source tab, on the Actions menu, choose Change identity source.
  6. For Choose identity source, select Active Directory.
  7. Choose Next.
  8. For Existing Directories, choose AWSEMR.COM.
  9. Choose Next.
  10. To confirm the change, enter ACCEPT in the confirmation input box, then choose Change identity source. Upon completion, you will be redirected to Settings, where you receive the alert Configurable AD sync paused.
  11. Choose Resume sync.
  12. Choose Settings in the navigation pane.
  13. On the Identity source tab, on the Actions menu, choose Manage sync.
  14. Choose Add users and groups to specify the users and groups to sync from Active Directory to IAM Identity Center.
  15. On the Users tab, enter tina and choose Add.
  16. Enter alex and choose Add.
  17. Choose Submit.
  18. On the Groups tab, enter datascience and choose Add.
  19. Choose Submit.

After your users and groups are synced to IAM Identity Center, you can see them by choosing Users or Groups in the navigation pane on the IAM Identity Center console. When they’re available, you can assign them access to AWS accounts and cloud applications. The initial sync may take up to 5 minutes.

Set up a SageMaker domain using IAM Identity Center

To set up a SageMaker domain, complete the following steps:

  1. On the SageMaker console, choose Domains in the navigation pane.
  2. Choose Create domain.
  3. Choose Standard setup, then choose Configure.
  4. For Domain Name, enter a unique name for your domain.
  5. For Authentication, choose AWS IAM Identity Center.
  6. Choose Create a new role for the default execution role.
  7. In the Create an IAM Role popup, choose Any S3 bucket.
  8. Choose Create role.
  9. Copy the role details to be used in next section for adding a policy for EMR cluster access.
  10. In the Network and storage section, specify the following:
    1. Choose the VPC that you created using the first CloudFormation template.
    2. Choose a private subnet in an Availability Zone supported by SageMaker.
    3. Use the default security group (sg-XXXX).
    4. Choose VPC only.

Note that there is a public domain called AWSEMR.COM that will conflict with the one created for this solution if Public internet only is selected.

  1. Leave all other options as default and choose Next.
  2. In the Studio settings section, accept the defaults and choose Next.
  3. In the RStudio settings section, accept the defaults and choose Next.
  4. In the Canvas setting section, accept the defaults and choose Submit.

Add a policy to provide SageMaker Studio access to the EMR cluster

Complete the following steps to give SageMaker Studio access to the EMR cluster:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Search and choose for the role you copied earlier (<AmazonSageMaker-ExecutionRole- XXXXXXXXXXXXXXX>).
  3. On the Permissions tab, choose Add permissions and Attach policy.
  4. Search for and choose the policy AmazonEMRFullAccessPolicy_v2.
  5. Choose Add permissions.

Add users and groups to access the domain

Complete the following steps to give users and groups access to the domain:

  1. On the SageMaker console, choose Domains in the navigation pane.
  2. Choose the domain you created earlier.
  3. On the Domain details page, choose Assign users and groups.
  4. On the Users tab, select the users tina and alex.
  5. On the Groups tab, select the group datascience.
  6. Choose Assign users and groups.

Configure Spark data access rights in Apache Ranger

Now that the AWS environment is set up, we configure Hive dataset security using Apache Ranger.

To start, collect the Apache Ranger URL details to access the Ranger admin console:

  1. On the Amazon EC2 console, choose Resources in the navigation pane, then Instance (running).
  2. Choose the Ranger server EC2 instance and copy the private IP DNS name (IPv4 only).
    Next, connect to the Windows domain controller to use the connected VPC to access the Ranger admin console. This is done by logging in to the Windows server and launching a web browser.
  3. Install the Remote Desktop Services client on your computer to connect with Windows Server.
  4. Authorize inbound traffic from your computer to the Windows AD domain controller EC2 instance.
  5. On the Amazon EC2 console, choose Resources in the navigation pane, then Instance (running).
  6. Choose on the Windows Domain Controller (DC1) EC2 instance ID and copy the public IP DNS name (IPv4 only).
  7. Use Microsoft Remote Desktop to log in to the Windows domain controller:
    1. Computer – Use the public IP DNS name (IPv4 only).
    2. Username – Enter awsadmin.
    3. Password – Use the password you set during the first CloudFormation template setup.
  8. Disable the Enhanced Security Configuration for Internet Explorer.
  9. Launch Internet Explorer and navigate to the Ranger admin console using the private IP DNS name (IPv4 only) associated with the Ranger server noted earlier and port 6182 (for example, https://<RangerServer Private IP DNS name>:6182).
  10. Choose Continue to this website (not recommended) if you receive a security alert.
  11. Log in using the default user name and password. During the first logon, you should modify your password and store it securely.
  12. In the top Ranger banner, choose Settings and Users/Groups/Roles.
  13. Confirm Tina and Alex are listed as users with a User Source of External.
  14. Confirm the datascience group is listed as a group with Group Source of External.

If the Tina or Alex users aren’t listed, follow the Apache Ranger troubleshooting instructions in the appendix at the end of this post.

Dataset policies

The Apache Ranger access policy model consists of two major components: specification of the resources a policy is applied to, such as files and directories, databases, tables, and columns, services, and so on, and the specification of access conditions (permissions) for specific users and groups.

Configure your dataset policy with the following steps:

  1. On the Ranger admin console, choose the Ranger icon in the top banner to return to the main page.
  2. Choose the service name amazonemrspark inside AMAZON-EMR-SPARK.
  3. Choose Add New Policy and add a new policy with the following parameters:
    1. For Policy Name, enter Data Science Policy.
    2. For Database, enter staging and default.
    3. For EMR Spark Table, enter products and orders.
    4. For EMR Spark Column, enter *.
    5. In the Allow Conditions section, for Select User, enter tina and alex, and for Permissions, enter select and read.
  4. Choose Add.
    When using Internet Explorer & adding a new policy, you may receive the error SCRIPT438: Object doesn't support property or method 'assign'. In this case, install and use an alternate browser such as Firefox or Chrome.
  5. Choose Add New Policy and add a new policy for tina:
    1. For Policy Name, enter Customer Demographics Policy.
    2. For Database, enter staging.
    3. For EMR Spark Table, enter Customers.
    4. For EMR Spark Column, choose customer_id, first_name, last_name, region, and state.
    5. In the Allow Conditions section, for Select User, enter Tina and for Permissions, enter select and read.
  6. Choose Add.

Configure Amazon S3 data access rights in Apache Ranger

Complete the following steps to configure Amazon S3 data access rights:

  1. On the Ranger admin console, choose the Ranger icon in the top banner to return to the main page.
  2. Choose the service name amazonemrs3 inside AMAZON-EMR-EMRFS.
  3. Choose Add New Policy and add a policy for the datascience group as follows:
    1. For Policy Name, enter Data Science S3 Policy.
    2. For S3 resource, enter the following:
      • aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/products
      • aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/orders
    3. In the Allow Conditions, section, for Select User, enter tina and alex, and for Permissions, enter GetObject and ListObjects.
  4. Choose Add.
  5. Choose Add New Policy and add a new policy for tina:
    1. For Policy Name, enter Customer Demographics S3 Policy.
    2. For S3 resource, enter aws-bigdata-blog/artifacts/aws-blog-emr-ranger/data/staging/customers.
    3. In the Allow Conditions section, for Select User, enter Tina and for Permissions, enter GetObject and ListObjects.
  6. Choose Add.

Configure Amazon S3 user working folders

While working with data, users often require data storage for interim results. To provide each user with a private working directory, complete the following steps:

  1. On the Ranger admin console, choose Ranger icon in the top banner to return to the main page.
  2. Choose the service name amazonemrs3 inside AMAZON-EMR-EMRFS.
  3. Choose Add New Policy and add a policy for {USER} as follows:
    1. For Policy Name, enter User Directory S3 Policy.
    2. For S3 resource, enter <Bucket Name>/data/{USER} (use a bucket within the account).
    3. Enable Recursive.
    4. In the Allow Conditions, section, for Select User, enter {USER} and for Permissions, enter GetObject, ListObjects, PutObject, and DeleteObject.
  4. Choose Add.

Use the user access login URL

Users attempting to access shared AWS applications via IAM Identity Center need to first log in to the AWS environment with a custom link using their Active Directory user name and password. The link needed can be found on the IAM Identity Center console.

  1. On the IAM Identity Center console, choose Settings in the navigation pane.
  2. On the Identity source tab, locate the user login link under AWS access portal URL.

Test role-based data access

To review, data scientist Tina needs to build a customer lifetime value model, which requires access to orders, product, and non-sensitive customer data. Data scientist Alex only needs access to orders and product data to build a product demand model.

In this section, we test the data access levels for each role.

Data scientist Tina

Complete the following steps:

  1. Log in using the URL you located in the previous step.
  2. Enter Microsoft AD user [email protected] and your password.
  3. Choose the Amazon SageMaker Studio tile.
  4. In the SageMaker Studio UI, start a notebook:
    1. Choose File, New, and Notebook.
    2. For Image, choose SparkMagic.
    3. For Kernel, choose PySpark.
    4. For Instance Type, choose ml.t3.medium.
    5. Choose Select.
  5. When the notebook kernel starts, connect to the EMR cluster by running the following code:
    %load_ext sagemaker_studio_analytics_extension.magics
    %sm_analytics emr connect --cluster-id <EMR Cluster ID> --auth-type Kerberos --language python

The EMR cluster ID details can be found on the Outputs tab of the EMR cluster CloudFormation stack created with the second template.

  1. Enter Microsoft AD [email protected] and your password. (Note that [email protected] is case-sensitive.)
  2. Choose Connect.

Now we can test Tina’s data access.

  1. In a new cell, enter the following query and run the cell:
    %%sql
    show tables from staging

Returned data will indicate the table objects accessible to Tina.

  1. In a new cell, run the following:
    %%sql
    select * from staging.customers limit 5

Returned data will include columns Tina has been granted access.

Let’s test Tina’s access to customer data.

  1. In a new cell, run the following:
    %%sql
    select customer_id, education_level, first_name, last_name, marital_status, region, state from staging.customers limit 15

The preceding query will result in an Access Denied error due to the inclusion of sensitive data columns.

During ad hoc analysis and model building, it’s common for users to create temporary datasets that need to be persisted for a short period. Let’s test Tina’s ability to create a working dataset and store results in a private working directory.

  1. In a new cell, run the following:
    join_order_to_customer = spark.sql("select orders.*, first_name, last_name, region, state from staging.orders, staging.customers where orders.customer_id = customers.customer_id")

  2. Before running the following code, update the S3 path variable <bucket name> to correspond to an S3 location within your local account:
    join_order_to_customer.write.mode("overwrite").format("parquet").option("path", "s3://<bucket name>/data/tina/order_and_product/").save()

The preceding query writes the created dataset as Parquet files in the S3 bucket specified.

Data scientist: Alex

Complete the following steps:

  1. Log in using the URL you located in the previous step.
  2. Enter Microsoft AD user [email protected] and your password.
  3. Choose the Amazon SageMaker Studio tile.
  4. In the SageMaker Studio UI, start a notebook:
    1. Choose File, New, and Notebook.
    2. For Image, choose SparkMagic.
    3. For Kernel, choose PySpark.
    4. For Instance Type, choose ml.t3.medium.
    5. Choose Select.
  5. When the notebook kernel starts, connect to the EMR cluster by running the following code:
    %load_ext sagemaker_studio_analytics_extension.magics
    %sm_analytics emr connect --cluster-id <EMR Cluster ID> --auth-type Kerberos --language python

  6. Enter Microsoft AD [email protected] and your password (note that [email protected] is case-sensitive).
  7. Choose Connect. Now we can test Alex’s data access.
  8. In a new cell, enter the following query and run the cell:
    %%sql
    show tables from staging

    Returned data will indicate the table objects accessible to Alex. Note that the customers table is missing.

  9. In a new cell, run the following:
    %%sql
    select * from staging.orders limit 5

Returned data will include columns Alex has been granted access.

Let’s test Alex’s access to customer data.

  1. In a new cell, run the following:
    %%sql
    select * from staging.customers limit 5

The preceding query will result in an Access Denied error because Alex doesn’t have access to customers.

We can verify Ranger is responsible for the denial by looking at the CloudWatch logs.

Now that you can successfully access data, feel free to interactively explore, visualize, prepare, and model the data using the different user personas.

Clean up

When you’re finished experimenting with this solution, clean up your resources:

  1. Shut down and update SageMaker Studio and Studio apps. Ensure that all apps created as part of this post are deleted before deleting the stack.
  2. Change the identity source for IAM Identity Center back to Identity Center Directory.
  3. Delete the directory AWSEMR.COM from Directory Services.
  4. Empty the S3 buckets created by the CloudFormation stacks.
  5. Delete the stacks via the AWS CloudFormation console for the non-nested stacks starting in reverse order.

Conclusion

This post showed how you can implement fine-grained access control in SageMaker Studio and Amazon EMR using Apache Ranger and Microsoft Active Directory. We also demonstrated how multiple SageMaker Studio users can connect to the same EMR cluster and access different tables and columns using Apache Ranger, wherein each user is scoped with permissions matching their individual level of access to data. In addition, we demonstrated how the individual users can access separate S3 folders for storing their intermediate data. We detailed the steps required to set up the integration and provided CloudFormation templates to set up the base infrastructure from end to end.

To learn more about using Amazon EMR with SageMaker Studio, refer to Prepare Data using Amazon EMR. We encourage you to try out this new functionality, and connect with the Machine Learning & AI community if you have any questions or feedback!

Appendix: Apache Ranger troubleshooting

The sync between Active Directory and Apache Ranger is set for every 24 hours. To force a sync, complete the following steps:

  1. Connect to the Apache Ranger server using SSH. This can be done using directly or Session Manager, a capability of AWS Systems Manager, or through AWS Cloud9.
  2. Once connected, issue the following commands:
    sudo /usr/bin/ranger-usersync stop || true
    sudo /usr/bin/ranger-usersync start
    sudo chkconfig ranger-usersync on

  3. To confirm the sync, open the Ranger console as an admin.
  4. Choose Audit in the top banner.
  5. Choose the User Sync tab and confirm the event time.

About the Authors

Rahul Sarda is a Senior Analytics & ML Specialist at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys spending time with his family, stay healthy, running and road cycling.

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up gardening during the lockdown.

Use IAM runtime roles with Amazon EMR Studio Workspaces and AWS Lake Formation for cross-account fine-grained access control

Post Syndicated from Ashley Zhou original https://aws.amazon.com/blogs/big-data/use-iam-runtime-roles-with-amazon-emr-studio-workspaces-and-aws-lake-formation-for-cross-account-fine-grained-access-control/

Amazon EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks and tools such as Spark UI and YARN Timeline Server via EMR Studio Workspaces. You can attach an EMR Studio Workspace to an EMR cluster, and use the compute power of the EMR cluster and run data science jobs on the cluster. Data is often stored in data lakes managed by AWS Lake Formation, enabling you to apply fine-grained access control through a simple grant or revoke mechanism.

We’re happy to introduce runtime roles for EMR Studio Workspaces. You can now define a runtime role and assign it to an EMR cluster when attaching an EMR Studio Workspace. The jobs on the EMR cluster will use this runtime role to access AWS resources. After configuring a runtime role, you can also use Lake Formation and apply fine-grained data access control for the jobs submitted by the EMR Studio Workspace.

Previously, when attaching EMR Studio Workspaces to EMR clusters, all Workspaces had to use the same AWS Identity and Access Management (IAM) role—namely, the cluster’s Amazon Elastic Compute Cloud (Amazon EC2) instance profile. Therefore, all Workspaces attached to the same EMR cluster had the same data access. To control access to data sources, each EMR Studio Workspace had to use a different EMR cluster, and multiple EMR instance profiles were needed.

Starting with the release of Amazon EMR 6.11, you can now choose a runtime role when attaching an EMR Studio Workspace to an EMR cluster. This runtime role scopes down access at the Workspace level. Your Apache Livy and Apache Spark jobs that run from the EMR Studio Workspaces will have permission to access only the data and resources permitted by policies attached to the runtime role. Also, when data is accessed from data lakes managed with Lake Formation, you can enforce fine-grained data access control using Lake Formation permissions. This helps you reduce operational overhead.

In this post, we demonstrate how to configure runtime roles for EMR Studio Workspaces and attach a Workspace to an EMR cluster with runtime roles. Because large enterprises typically use multiple AWS accounts, and many of those accounts might need access to a data lake managed by a single AWS account, our example uses two AWS accounts. We explain how to control access to EMR Studio runtime roles, manage data access across accounts in a data lake via Lake Formation, and enforce table-level and column-level permissions to the EMR runtime roles.

Solution overview

To demonstrate fine-grained access control, we create a sample AWS Glue database named company and manage the database permission in Lake Formation. The database consists of two separate tables:

  • employees – This table stores information about the company’s employees, including employee ID, name, department, and salary
  • products – This table stores information about the products sold by the company, including product ID, name, category, and price

To demonstrate data access control, we consider the following data users:

  • Alice, a data scientist in the sales team – She should have read-only access to all columns in the products table and selected columns, including uID, name, and department in the employees table
  • Bob, a data scientist in the human resources team – He should have read-only access to all columns in employees table and should not have access to the products table

To demonstrate cross-account data sharing, we consider two accounts:

  • Data producer account – We refer to this account as 123456789012 in this post. This account manages the raw data in Amazon Simple Storage Service (Amazon S3) and writes data to the data lake. The company database and tables should be in this account.
  • Data consumer account – We refer to this account as 111122223333 in this post. This account is accessed directly by the users for data analysis and doesn’t have write access to the data. This account should be accessible by Alice and Bob.

The architecture is implemented as follows:

  • The data producer account manages a data lake. Raw data is stored in S3 buckets and catalogued in the AWS Glue Data Catalog.
  • Lake Formation in the data producer account governs the data access via the Data Catalog, and provides cross-account data sharing with the data consumer account.
  • Lake Formation in the data consumer account governs cross-account access to the data lake on table level and fine-grained Lake Formation permissions. For more information, refer to Methods for fine-grained access control.
  • EMR Studio Workspaces in the data consumer account use runtime roles when running jobs on an EMR cluster.
  • The EMR cluster connects to Glue Data Catalog in the data consumer account and queries the data from the data lake through cross-account data sharing.

The following diagram illustrates this architecture.

In the following sections, we go through the steps to share data across accounts via Lake Formation, run an EMR Studio Workspace with runtime roles, and demonstrate fine-grained access control.

Prerequisites

You should have the following prerequisites:

Create the infrastructure in the data producer account

Complete the following steps to create the infrastructure resources:

  1. Log in to the data producer AWS account (123456789012).
  2. Choose Launch Stack to deploy a CloudFormation template to create the necessary resources.
  3. For DataLakeBucketSuffix, enter the suffix for the S3 bucket used by the data lake. The whole S3 bucket name to be created will be {AwsAccoundId}-{AwsRegion}-{DataLakeBucketSuffix}.
  4. After the CloudFormation stack is created, navigate to the Outputs tab of the stack and capture the value of DataLakeS3Bucket to use in the next step.

Create data files and upload them to Amazon S3 in the data producer account

Configure your AWS CLI to use the IAM identity with permission to upload to DataLakeS3BucketName in the data producer AWS account (123456789012), or you can sign in to CloudShell using the AWS Management Console. Complete the following steps:

  1. On your local machine, move to a directory of your choice with the cd command, for example, cd ~.
  2. Run the script with chmod 744 create_sample_data.sh && ./create_sample_data.sh <DataLakeS3BucketName>.

The script will create a subdirectory tmp in your current working directory, create the test data in CSV files, and upload the files to the DataLakeS3BucketName S3 bucket.

Set up Lake Formation in the data producer account

In this section, we walk through the steps to set up Lake Formation in the data producer account.

Set up Lake Formation cross-account data sharing version settings

Lake Formation supports multiple data sharing versions. For this post, we use version 3. To learn more about the differences between data sharing versions, refer to Updating cross-account data sharing version settings. To change the data sharing version, see To enable the new version.

Register the Amazon S3 location as the data lake location

When you register an Amazon S3 location with Lake Formation, you specify an IAM role with read/write permissions on that location. After registering, when EMR clusters request access to this Amazon S3 location, Lake Formation will supply temporary credentials of the provided role to access the data. We already created the role LakeFormationCompanyDatabaseDataAccessRole for this purpose in the previous step. To register the Amazon S3 location as the data lake location, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data producer account (123456789012).
  2. In the navigation pane, choose Data lake locations under Administration.
  3. Choose Register location.
  4. For Amazon S3 path, enter s3://<DataLakeS3BucketName>/company-database.
  5. For IAM role, enter LakeFormationCompanyDatabaseDataAccessRole.
  6. For Permission mode, select Lake Formation.
  7. Choose Register location.

Register data location

Revoke permissions granted to IAMAllowedPrincipals

The IAMAllowedPrincipals group includes any IAM users and roles that are allowed access to your Data Catalog resources by your IAM policies. To enforce the Lake Formation model, we need to revoke permission from IAMAllowedPrincipals using the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data producer account.
  2. In the navigation pane, choose Data lake permissions under Permissions.
  3. Filter permissions by Database = company and Principle=IAMAllowedPrinciples.
  4. Select all the permissions given to the principal IAMAllowedPrincipals and choose Revoke.

Revoke permissions granted to IAMAllowedPrincipals

Set up application integration settings

To enforce permissions for the EMR cluster, you need to register a session tag value with Lake Formation. Lake Formation uses this session tag to authorize callers and provide access to the data lake. We register Amazon EMR as the session tag value. This value will be referenced in the security configuration when creating the EMR cluster.

Set up the session tag using the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data producer account.
  2. Choose Application integration settings under Administration in the navigation pane.
  3. Select Allow external engines to filter data in Amazon S3 locations registered with Lake Formation.
  4. For Session tag values, enter Amazon EMR.
  5. For AWS account IDs, enter the data consumer AWS account ID (111122223333).
  6. Choose Save.

Set up application integration settings in data producer account

Share the database and tables to the data consumer account

We now grant permissions to the data consumer AWS account, including grantable permissions. This allows the Lake Formation data lake administrator in the data consumer account to control access to the data within the account.

Grant database permissions to the data consumer account

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data producer account.
  2. In the navigation pane, choose Databases.
  3. Select the database company, and on the Actions menu, under Permissions, choose Grant.
  4. In the Principles section, select External accounts and enter the data consumer AWS account (111122223333).
  5. In the LF-Tags or catalog resources section, choose company for Databases.
  6. In the Database permissions section, select Describe for both Database permissions and Grantable permissions.

This allows the data lake administrator in the data consumer account to describe the database and grant describe permissions to other principals in the data consumer account.

  1. Choose Grant.

Grant database permissions to the data consumer account

Grant table permissions to the data consumer account

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data producer account.
  2. In the navigation pane, choose Tables.
  3. Select the products table, which belongs to the company database, and on the Actions menu, under Permissions, choose Grant.
  4. In the Principles section, select External accounts and enter in the data consumer AWS account (111122223333).
  5. In the LF-Tags or catalog resources section, select Named data catalog resources and specify the following:
    1. For Databases, choose company.
    2. For Tables, choose products and employees.
  6. In the Table permissions section, choose Select and Describe for both Table permissions and Grantable permissions.

This allows the data lake administrator in the data consumer account to select and describe the tables, and grant select and describe table permissions to other principals in the data consumer account.

  1. In the Data permissions section, select All data access.
  2. Choose Grant.

Grant table permissions to the data consumer account
Now we have finished setting up the data producer account.

Set up the infrastructure in the data consumer account

Complete the following steps to create the infrastructure resources:

  1. Log in to the data consumer account (111122223333).
  2. Choose Launch stack to deploy a CloudFormation template to create the necessary resources.
    Launch Stack
  3. For Release Label, enter the Amazon EMR release label to use, which can only be emr-6.11 or up.
  4. For InstanceType, choose the instance type for EMR cluster, such as r4.4xlarge.
  5. For EMRS3BucketNameSuffix, enter the S3 bucket suffix to store EMR cluster logs and EMR notebook files. The full S3 bucket name to be created will be {AWSAccoundId}-{AWSRegion}-{EMRS3BucketNameSuffix}.
  6. For S3PathToInTransitCertificate, enter the S3 path for the .zip file that contains the .pem files used for in-transit encryption.

For instructions on creating the .zip file that contains the .pem files and uploading them to your S3 bucket, refer to Providing certificates for encrypting data in transit with Amazon EMR encryption.

  1. After the CloudFormation stack is created, navigate to the Outputs tab of the stack.
  2. Capture the value of EMRStudioLink to use to sign in to EMR Studio.

Accept the resource share in the data consumer account

To access shared resources, you must accept the invitation first.

  1. Open the AWS RAM console of the data consumer account with the IAM identity that has AWS RAM access.
  2. In the navigation pane, choose Resource shares under Shared with me.

You should see two pending resource shares from the data producer account.

  1. Accept both resource shares.

You should see the company database, employees table, and products table in the Data Catalog.

Set up Lake Formation in the data consumer account

In this section, we walk through the steps to set up Lake Formation in the data consumer account.

Set up application integration settings

Similar to the setup in the data producer account, you need register Amazon EMR as a session tag. This value is referenced in the security configuration when creating the EMR cluster in the CloudFormation stack.

To do that, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account (111122223333).
  2. Choose Application integration settings under Administration in the navigation pane.
  3. Select Allow external engines to filter data in Amazon S3 locations registered with Lake Formation.
  4. For Session tag values, enter Amazon EMR.
  5. For AWS account IDs, enter the data consumer AWS account ID (111122223333).
  6. Choose Save.

Set up application integration settings in data consumer account

Grant describe permissions to runtime roles on the default database

If you don’t have a default database in Lake Formation, or your default database already has permissions to grant to IAMAllowedPrinciples, you can skip this step.

Amazon EMR will check on the default database by default. If you already have a default database in your Lake Formation, grant the describe permission to the runtime roles on the default database by completing the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator user in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the default database, verify that the owner account ID is the data consumer account (111122223333), and on the Actions menu, choose Grant.
  4. In the Principles section, select IAM users and roles.
  5. For IAM users and roles, choose sales-runtime-role and human-resource-runtime-role.
  6. For LF-Tags or catalog resources, select Named data catalog resources and choose default for Databases.
  7. In the Database permissions section, for Database permissions, choose Describe.
  8. Choose Grant.

Grant describe permissions to runtime roles on the default database

Create a resource link for the shared database

To access the database and table resources that were shared by the data producer AWS account, you need to create a resource link in the data consumer AWS account. A resource link is a Data Catalog object that is a link to a local or shared database or table. After you create a resource link to a database or table, you can use the resource link name wherever you would use the database or table name. In this step, you grant permission on the resource links to the runtime role principles. The runtime roles will then access the data in shared databases and underlying tables through the resource link.

To create a resource link, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the company database, verify that the owner account ID is the data producer account (123456789012), and on the Actions menu, choose Create Resource links.
  4. For Resource link name, enter the name of the resource link (for example, company-shared).
  5. For Shared database’s region, choose the Region of the company database.
  6. For Shared database, choose the company database.
  7. For Shared database’s owner ID, enter the account ID of the data producer account (123456789012).
  8. Choose Create.

Create a resource link for the shared database

Grant permissions on the resource link to the runtime role principle

Grant permissions on the resource link to sales-runtime-role and human-resource-runtime-role using the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (company-shared) and on the Actions menu, choose Grant.
  4. In the Principles section, select IAM users and roles, and choose sales-runtime-role and human-resource-runtime-role.
  5. In the LF-Tags or catalog resources section, for Databases, choose company-shared.
  6. In the Resource link permissions section, select Describe.

This allows the runtime roles to describe the resource link. We don’t make any selections for grantable permissions because runtime roles shouldn’t be able to grant permissions to other principles.

  1. Choose Grant.

Grant permissions on the resource link to the runtime role principle

Grant permission on the tables to the runtime role principle

You need to grant permissions on the tables to sales-runtime-role and human-resource-runtime-role to allow data access:

  • Human-resource-runtime-role should have describe and select permissions on all columns in the employees table, and no permissions on the products table.
  • Sales-runtime-role should have select permissions on the columns uid, name, and department in the employees table, and describe and select permissions on all columns in the products table.

Grant permission on the employees table to human-resource-runtime-role

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (company-shared) and on the Actions menu, choose Grant on Target.
  4. In the Principles section, select IAM users and roles, then choose human-resource-runtime-role.
  5. In the LF-Tags or catalog resources section, select Named data catalog resources and specify the following:
    1. For Databases, choose company.
    2. For Tables¸ choose employees.
  6. In the Table permissions section, for Table permissions, select Describe and Select.
  7. In the Data permissions section, select All data access.
  8. Choose Grant.

Grant permission on the employees table to human-resource-runtime-role

Grant permission on the employees table to sales-runtime-role

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (company-shared) and on the Actions menu, choose Grant on Target.
  4. In the Principles section, select IAM users and roles, then choose sales-runtime-role.
  5. In the LF-Tags or catalog resources section, select Named data catalog resources and specify the following:
    1. For Databases, choose company.
    2. For Tables, choose employees.
  6. In the Table permissions section, for Table permissions, select Select.
  7. In the Data permissions section, select Column-based access.
  8. Select Include columns and choose the uid, name, and department columns.
  9. Choose Grant.

 Grant permission on the employees table to sales-runtime-role

Grant permission on the products table to sales-runtime-role

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the data consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (company-shared) and on the Actions menu, choose Grant on Target.
  4. In the Principles section, select IAM users and roles, then choose sales-runtime-role.
  5. In the LF-Tags or catalog resources section, select Named data catalog resources and specify the following:
    1. For Databases, choose company.
    2. For Tables, choose products.
  6. In the Table permissions section, for Table permissions, select Select and Describe.
  7. In the Data permissions section, select All data access.
  8. Choose Grant.

Grant permission on the products table to sales-runtime-role

Log in to EMR Studio and use the EMR Studio Workspace

Switch your role to alice-role or bob-role on the console using different web browsers to test access. Open the EMRStudioLink URL from the CloudFormation stack output to sign in to the EMR Studio with each role, then complete the following steps:

  1. Choose Workspaces in the navigation pane and choose Create Workspace.
  2. Enter a name and a description for the Workspace.
  3. Choose Create Workspace.

A new tab containing JupyterLab will open automatically when the Workspace is ready. Enable pop-ups in your browser if necessary.

  1. Chose the Compute icon in the navigation pane to attach the EMR Studio Workspace with a compute engine.
  2. Select EMR cluster on EC2 for Compute type.
  3. Choose the EMR cluster ID you created with AWS CloudFormation.
  4. For Runtime role, choose sales-runtime-role if signed in as alice-role. Choose human-resource-runtime-role if signed in as bob-role.
  5. Choose Attach.

attach EMR Studio Workspace to cluster

Run code in the EMR Studio Workspace and verify data access

Run the following code in the EMR Studio Workspace with a PySpark kernel after signing in with alice-role or bob-role:

%%sql -o result -n -1
select * from `company-shared`.products limit 5;

%%sql -o result -n -1
select * from `company-shared`.employees limit 5;

You should see different results when using different roles.

According to our data access configuration in Lake Formation, Alice will have full data access for the products table. She can view all the columns except for salary in the employees table.

Alice (sales) query result

For Bob, according to our data access configuration in Lake Formation, he will have full data access to the employees table, but he has no access to the products table.

Bob (human resource) query result

Clean up

When you’re finished experimenting with this solution, clean up your resources:

  1. Stop and delete the EMR Studio Workspaces created in the data consumer AWS account.
  2. Delete all the content in the S3 bucket EMRS3Bucket in the data consumer AWS account.
  3. Delete the CloudFormation stack in the data consumer AWS account.
  4. Delete all the content in the S3 bucket DataLakeS3Bucket in the data producer AWS account.
  5. Delete the CloudFormation stack in the data producer AWS account.

Conclusion

This post showed how you can use runtime roles to connect to an EMR Studio Workspace with Amazon EMR to apply cross-account fine-grained data access control with Lake Formation. We also demonstrated how multiple EMR Studio users can connect to the same EMR cluster, each using a runtime role scoped with permissions matching their individual level of access to data.

To learn more about using EMR Studio Workspaces with Lake Formation, refer to Run an EMR Studio Workspace with a runtime role. We encourage you to try out this new functionality, and connect with the us if you have any questions or feedback!


About the Authors

Ashley Zhou is a Software Development Engineer at AWS. She is interested in data analytics and distributed systems.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building analytics and data mesh solutions on AWS and sharing them with the community.

GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless

Post Syndicated from Mukul Sharma original https://aws.amazon.com/blogs/big-data/godaddy-benchmarking-results-in-up-to-24-better-price-performance-for-their-spark-workloads-with-aws-graviton2-on-amazon-emr-serverless/

This is a guest post co-written with Mukul Sharma, Software Development Engineer, and Ozcan IIikhan, Director of Engineering from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 22 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps us drive business decisions to delight our customers. At GoDaddy, we embarked on a journey to uncover the efficiency promises of AWS Graviton2 on Amazon EMR Serverless as part of our long-term vision for cost-effective intelligent computing.

In this post, we share the methodology and results of our benchmarking exercise comparing the cost-effectiveness of EMR Serverless on the arm64 (Graviton2) architecture against the traditional x86_64 architecture. EMR Serverless on Graviton2 demonstrated an advantage in cost-effectiveness, resulting in significant savings in total run costs. We achieved 23.85% improvement in price-performance for sample production Spark workloads—an outcome that holds tremendous potential for businesses striving to maximize their computing efficiency.

Solution overview

GoDaddy’s intelligent compute platform envisions simplification of compute operations for all personas, without limiting power users, to ensure out-of-box cost and performance optimization for data and ML workloads. As a part of this vision, GoDaddy’s Data & ML Platform team plans to use EMR Serverless as one of the compute solutions under the hood.

The following diagram shows a high-level illustration of the intelligent compute platform vision.

Benchmarking EMR Serverless for GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

At GoDaddy, we embarked on a comprehensive study to benchmark EMR Serverless using real production workflows at GoDaddy. The purpose of the study was to evaluate the performance and efficiency of EMR Serverless and develop a well-informed adoption plan. The results of the study have been extremely promising, showcasing the potential of EMR Serverless for our workloads.

Having achieved compelling results in favor of EMR Serverless for our workloads, our attention turned to evaluating the utilization of the Graviton2 (arm64) architecture on EMR Serverless. In this post, we focus on comparing the performance of Graviton2 (arm64) with the x86_64 architecture on EMR Serverless. By conducting this apples-to-apples comparative analysis, we aim to gain valuable insights into the benefits and considerations of using Graviton2 for our big data workloads.

By using EMR Serverless and exploring the performance of Graviton2, GoDaddy aims to optimize their big data workflows and make informed decisions regarding the most suitable architecture for their specific needs. The combination of EMR Serverless and Graviton2 presents an exciting opportunity to enhance the data processing capabilities and drive efficiency in our operations.

AWS Graviton2

The Graviton2 processors are specifically designed by AWS, utilizing powerful 64-bit Arm Neoverse cores. This custom-built architecture provides a remarkable boost in price-performance for various cloud workloads.

In terms of cost, Graviton2 offers an appealing advantage. As indicated in the following table, the pricing for Graviton2 is 20% lower compared to the x86 architecture option.

   x86_64  arm64 (Graviton2) 
per vCPU per hour $0.052624 $0.042094
per GB per hour $0.0057785 $0.004628
per storage GB per hour* $0.000111

*Ephemeral storage: 20 GB of ephemeral storage is available for all workers by default—you pay only for any additional storage that you configure per worker.

For specific pricing details and current information, refer to Amazon EMR pricing.

AWS benchmark

The AWS team performed benchmark tests on Spark workloads with Graviton2 on EMR Serverless using the TPC-DS 3 TB scale performance benchmarks. The summary of their analysis are as follows:

  • Graviton2 on EMR Serverless demonstrated an average improvement of 10% for Spark workloads in terms of runtime. This indicates that the runtime for Spark-based tasks was reduced by approximately 10% when utilizing Graviton2.
  • Although the majority of queries showcased improved performance, a small subset of queries experienced a regression of up to 7% on Graviton2. These specific queries showed a slight decrease in performance compared to the x86 architecture option.
  • In addition to the performance analysis, the AWS team considered the cost factor. Graviton2 is offered at a 20% lower cost than the x86 architecture option. Taking this cost advantage into account, the AWS benchmark set yielded an overall 27% better price-performance for workloads. This means that by using Graviton2, users can achieve a 27% improvement in performance per unit of cost compared to the x86 architecture option.

These findings highlight the significant benefits of using Graviton2 on EMR Serverless for Spark workloads, with improved performance and cost-efficiency. It showcases the potential of Graviton2 in delivering enhanced price-performance ratios, making it an attractive choice for organizations seeking to optimize their big data workloads.

GoDaddy benchmark

During our initial experimentation, we observed that arm64 on EMR Serverless consistently outperformed or performed on par with x86_64. One of the jobs showed a 7.51% increase in resource usage on arm64 compared to x86_64, but due to the lower price of arm64, it still resulted in a 13.48% cost reduction. In another instance, we achieved an impressive 43.7% reduction in run cost, attributed to both the lower price and reduced resource utilization. Overall, our initial tests indicated that arm64 on EMR Serverless delivered superior price-performance compared to x86_64. These promising findings motivated us to conduct a more comprehensive and rigorous study.

Benchmark results

To gain a deeper understanding of the value of Graviton2 on EMR Serverless, we conducted our study using real-life production workloads from GoDaddy, which are scheduled to run at a daily cadence. Without any exceptions, EMR Serverless on arm64 (Graviton2) is significantly more cost-effective compared to the same jobs run on EMR Serverless on the x86_64 architecture. In fact, we recorded an impressive 23.85% improvement in price-performance across the sample GoDaddy jobs using Graviton2.

Like the AWS benchmarks, we observed slight regressions of less than 5% in the total runtime of some jobs. However, given that these jobs will be migrated from Amazon EMR on EC2 to EMR Serverless, the overall total runtime will still be shorter due to the minimal provisioning time in EMR Serverless. Additionally, across all jobs, we observed an average speed up of 2.1% in addition to the cost savings achieved.

These benchmarking results provide compelling evidence of the value and effectiveness of Graviton2 on EMR Serverless. The combination of improved price-performance, shorter runtimes, and overall cost savings makes Graviton2 a highly attractive option for optimizing big data workloads.

Benchmarking methodology

As an extension of a larger benchmarking EMR Serverless for GoDaddy study, where we divided Spark jobs into brackets based on total runtime (quick-run, medium-run, long-run), we measured effect of architecture (arm64 vs. x86_64) on total cost and total runtime. All other parameters were kept the same to achieve an apples-to-apples comparison.

The team followed these steps:

  1. Prepare the data and environment.
  2. Choose two random production jobs from each job bracket.
  3. Make necessary changes to avoid inference with actual production outputs.
  4. Run tests to execute scripts over multiple iterations to collect accurate and consistent data points.
  5. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  6. Gather relevant metrics from the tests.
  7. Analyze results to draw insights and conclusions.

The following table shows the summary of an example Spark job.

Metric  EMR Serverless (Average) – X86_64  EMR Serverless (Average) – Graviton  X86_64 vs Graviton (% Difference) 
Total Run Cost $2.76 $1.85 32.97%

Total Runtime

(hh:mm:ss)

00:41:31 00:34:32 16.82%
EMR Release Label emr-6.9.0
Job Type Spark
Spark Version Spark 3.3.0
Hadoop Distribution Amazon 3.3.3
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3

Summary of results

The following table presents a comparison of job performance between EMR Serverless on arm64 (Graviton2) and EMR Serverless on x86_64. For each architecture, every job was run at least three times to obtain the accurate average cost and runtime.

 Job  Average x86_64 Cost Average arm64 Cost Average x86_64 Runtime (hh:mm:ss) Average arm64 Runtime (hh:mm:ss)  Average Cost Savings %  Average Performance Gain % 
1 $1.64 $1.25 00:08:43 00:09:01 23.89% -3.24%
2 $10.00 $8.69 00:27:55 00:28:25 13.07% -1.79%
3 $29.66 $24.15 00:50:49 00:53:17 18.56% -4.85%
4 $34.42 $25.80 01:20:02 01:24:54 25.04% -6.08%
5 $2.76 $1.85 00:41:31 00:34:32 32.97% 16.82%
6 $34.07 $24.00 00:57:58 00:51:09 29.57% 11.76%
Average  23.85% 2.10%

Note that the improvement calculations are based on higher-precision results for more accuracy.

Conclusion

Based on this study, GoDaddy observed a significant 23.85% improvement in price-performance for sample production Spark jobs utilizing the arm64 architecture compared to the x86_64 architecture. These compelling results have led us to strongly recommend internal teams to use arm64 (Graviton2) on EMR Serverless, except in cases where there are compatibility issues with third-party packages and libraries. By adopting an arm64 architecture, organizations can achieve enhanced cost-effectiveness and performance for their workloads, contributing to more efficient data processing and analytics.


About the Authors

Mukul Sharma is a Software Development Engineer on Data & Analytics (DnA) organization at GoDaddy. He is a polyglot programmer with experience in a wide array of technologies to rapidly deliver scalable solutions. He enjoys singing karaoke, playing various board games, and working on personal programming projects in his spare time.

Ozcan Ilikhan is a Director of Engineering on Data & Analytics (DnA) organization at GoDaddy. He is passionate about solving customer problems and increasing efficiency using data and ML/AI. In his spare time, he loves reading, hiking, gardening, and working on DIY projects.

Harsh Vardhan Singh Gaur is an AWS Solutions Architect, specializing in analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Ramesh Kumar Venkatraman is a Senior Solutions Architect at AWS who is passionate about containers and databases. He works with AWS customers to design, deploy, and manage their AWS workloads and architectures. In his spare time, he loves to play with his two kids and follows cricket.

AWS Weekly Roundup – CodeWhisperer, CodeCatalyst, RDS, Route53, and more – October 24, 2023

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-codewhisperer-codecatalyst-rds-route53-and-more-october-23-2023/

The entire AWS News Blog team is fully focused on writing posts to announce the new services and features during our annual customer conference in Las Vegas, AWS re:Invent! And while we prepare content for you to read, our services teams continue to innovate. Here is my summary of last week’s launches.

Last week’s launches
Here are some of the launches that captured my attention:

Amazon CodeCatalystYou can now add a cron expression to trigger a CI/CD workflow, providing a way to start workflows at set times. CodeCatalyst is a unified development service that integrates a project’s collaboration tools, CI/CD pipelines, and development and deployment environments.

Amazon Route53You can now route your customer’s traffic to their closest AWS Local Zones to improve application performance for latency-sensitive workloads. Learn more about geoproximity routing in the Route53 documentation.

Amazon RDS – The root certificates we use to sign your databases’ TLS certificates will expire in 2024. You must generate new certificates for your databases before the expiration date. This blog post details the procedure step by step. The new root certificates we generated are valid for the next 40 years for RSA2048 and 100 years for the RSA4098 and ECC384. It is likely this is the last time in your professional career that you are obliged to renew your database certificates for AWS.

Amazon MSK – Replicating Kafka clusters at scale is difficult and often involves managing the infrastructure and the replication solution by yourself. We launched Amazon MSK Replicator, a fully managed replication solution for your Kafka clusters, in the same or across multiple AWS Regions.

Amazon CodeWhisperer – We launched a preview for an upcoming capability of Amazon CodeWhisperer Professional. You can now train CodeWhisperer on your private code base. It allows you to give your organization’s developers more relevant suggestions to better assist them in their day-to-day coding against your organization’s private libraries and frameworks.

Amazon EC2The seventh generation of memory-optimized EC2 instances is available (R7i). These instances use the 4th Generation Intel Xeon Scalable Processors (Sapphire Rapids). This family of instances provides up to 192 vCPU and 1,536 GB of memory. They are well-suited for memory-intensive applications such as in-memory databases or caches.

X in Y – We launched existing services and instance types in additional Regions:

Other AWS news
Here are some other blog posts and news items that you might like:

The Community.AWS blog has new posts to teach you how to integrate Amazon Bedrock inside your Java and Go applications, and my colleague Brooke wrote a survival guide for re:Invent first-timers.

The Official AWS Podcast – Listen each week for updates on the latest AWS news and deep dives into exciting use cases. There are also official AWS podcasts in several languages. Check out the ones in FrenchGermanItalian, and Spanish.

Some other great sources of AWS news include:

Upcoming AWS events
Check your calendars and sign up for these AWS events:

AWS Community DayAWS Community Days – Join a community-led conference run by AWS user group leaders in your region: Jaipur (November 4), Vadodara (November 4), and Brasil (November 4).

AWS Innovate: Every Application Edition – Join our free online conference to explore cutting-edge ways to enhance security and reliability, optimize performance on a budget, speed up application development, and revolutionize your applications with generative AI. Register for AWS Innovate Online Asia Pacific & Japan on October 26.

AWS re:Invent 2023AWS re:Invent (November 27 – December 1) – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Browse the session catalog and attendee guides and check out the re:Invent highlights for generative AI.

You can browse all upcoming in-person and virtual events.

And that’s all for me today. I’ll go back writing my re:Invent blog posts.

Check back next Monday for another Weekly Roundup!

— seb

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Run Apache Hive workloads using Spark SQL with Amazon EMR on EKS

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/run-apache-hive-workloads-using-spark-sql-with-amazon-emr-on-eks/

Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. Using Spark SQL to run Hive workloads provides not only the simplicity of SQL-like queries but also taps into the exceptional speed and performance provided by Spark. Spark SQL is an Apache Spark module for structured data processing. One of its most popular use cases is to read and write Hive tables with connectivity to a persistent Hive metastore, supporting Hive SerDes and user-defined functions.

Starting from version 1.2.0, Apache Spark has supported queries written in HiveQL. HiveQL is a SQL-like language that produces data queries containing business logic, which can be converted to Spark jobs. However, this feature is only supported by YARN or standalone Spark mode. To run HiveQL-based data workloads with Spark on Kubernetes mode, engineers must embed their SQL queries into programmatic code such as PySpark, which requires additional effort to manually change code.

Amazon EMR on Amazon EKS provides a deployment option for Amazon EMR that you can use to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS).

Amazon EMR on EKS release 6.7.0 and later include the ability to run SparkSQL through the StartJobRun API. As a result of this enhancement, customers will now be able to supply SQL entry-point files and run HiveQL queries as Spark jobs on EMR on EKS directly. The feature is available in all AWS Regions where EMR on EKS is available.

Use case

FINRA is one of the largest Amazon EMR customers that is running SQL-based workloads using the Hive on Spark approach. FINRA, Financial Industry Regulatory Authority, is a private sector regulator responsible for analyzing equities and option trading activity in the US. To look for fraud, market manipulation, insider trading, and abuse, FINRA’s technology group has developed a robust set of big data tools in the AWS Cloud to support these activities.

FINRA centralizes all its data in Amazon Simple Storage Service (Amazon S3) with a remote Hive metastore on Amazon Relational Database Service (Amazon RDS) to manage their metadata information. They use various AWS analytics services, such as Amazon EMR, to enable their analysts and data scientists to apply advanced analytics techniques to interactively develop and test new surveillance patterns and improve investor protection. To make these interactions more efficient and productive, FINRA modernized their hive workloads in Amazon EMR from its legacy Hive on MapReduce to Hive on Spark, which resulted in query performance gains between 50 and 80 percent.

Going forward, FINRA wants to further innovate the interactive big data platform by moving from a monolithic design pattern to a job-centric paradigm, so that it can fulfill future capacity requirements as its business grows. The capability of running Hive workloads using SparkSQL directly with EMR on EKS is one of the key enablers that helps FINRA continuously pursue that goal.

Additionally, EMR on EKS offers the following benefits to accelerate adoption:

  • Fine-grained access controls (IRSA) that are job-centric to harden customers’ security posture
  • Minimized adoption effort as it enables direct Hive query submission as a Spark job without code changes
  • Reduced run costs by consolidating multiple software versions for Hive or Spark, unifying artificial intelligence and machine learning (AI/ML) and exchange, transform, and load (ETL) pipelines into a single environment
  • Simplified cluster management through multi-Availability Zone support and highly responsive autoscaling and provisioning
  • Reduced operational overhead by hosting multiple compute and storage types or CPU architectures (x86 & Arm64) in a single configuration
  • Increased application reusability and portability supported by custom docker images, which allows them to encapsulate all necessary dependencies

Running Hive SQL queries on EMR on EKS

Prerequisites

Make sure that you have AWS Command Line Interface (AWS CLI) version 1.25.70 or later installed. If you’re running AWS CLI version 2, you need version 2.7.31 or later. Use the following command to check your AWS CLI version:

aws --version

If necessary, install or update the latest version of the AWS CLI.

Solution Overview

To get started, let’s look at the following diagram. It illustrates a high-level architectural design and different services that can be used in the Hive workload. To match with FINRA’s use case, we chose an Amazon RDS database as the remote Hive metastore. Alternatively, you can use AWS Glue Data Catalog as the metastore for Hive if needed. For more details, see the aws-sample github project.

The minimum required infrastructure is:

  • An S3 bucket to store a Hive SQL script file
  • An Amazon EKS cluster with EMR on EKS enabled
  • An Amazon RDS for MySQL database in the same virtual private cloud (VPC) as the Amazon EKS cluster
  • A standalone Hive metastore service (HMS) running on the EKS cluster or a small Amazon EMR on EC2 cluster with the Hive application installed

To have a quick start, run the sample CloudFormation deployment. The infrastructure deployment includes the following resources:

Create a Hive script file

Store a few lines of Hive queries in a single file, then upload the file to your S3 bucket, which can be found in your AWS Management Console in the AWS CloudFormation Outputs tab. Search for the key value of CODEBUCKET as shown in preceding screenshot. For a quick start, you can skip this step and use the sample file stored in s3://<YOUR_S3BUCKET>/app_code/job/set-of-hive-queries.sql. The following is a code snippet from the sample file :

-- drop database in case switch between different hive metastore

DROP DATABASE IF EXISTS hiveonspark CASCADE;
CREATE DATABASE hiveonspark;
USE hiveonspark;

--create hive managed table
DROP TABLE IF EXISTS testtable purge;
CREATE TABLE IF NOT EXISTS testtable (`key` INT, `value` STRING) using hive;
LOAD DATA LOCAL INPATH '/usr/lib/spark/examples/src/main/resources/kv1.txt' INTO TABLE testtable;
SELECT * FROM testtable WHERE key=238;

-- test1: add column
ALTER TABLE testtable ADD COLUMNS (`arrayCol` Array<int>);
-- test2: insert
INSERT INTO testtable VALUES 
(238,'val_238',array(1,3)),
(238,'val_238',array(2,3));
SELECT * FROM testtable WHERE key=238;
-- test3: UDF
CREATE TEMPORARY FUNCTION hiveUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode';
SELECT `key`,`value`,hiveUDF(arrayCol) FROM testtable WHERE key=238;
-- test4: CTAS table with parameter
DROP TABLE IF EXISTS ctas_testtable purge;
CREATE TABLE ctas_testtable 
STORED AS ORC
AS
SELECT * FROM testtable;
SELECT * FROM ctas_testtable WHERE key=${key_ID};
-- test5: External table mapped to S3
CREATE EXTERNAL TABLE IF NOT EXISTS amazonreview
( 
  marketplace string, 
  customer_id string, 
  review_id  string, 
  product_id  string, 
  product_parent  string, 
  product_title  string, 
  star_rating  integer, 
  helpful_votes  integer, 
  total_votes  integer, 
  vine  string, 
  verified_purchase  string, 
  review_headline  string, 
  review_body  string, 
  review_date  date, 
  year  integer
) 
STORED AS PARQUET 
LOCATION 's3://${S3Bucket}/app_code/data/toy/';
SELECT count(*) FROM amazonreview;

Submit the Hive script to EMR on EKS

First, set up the required environment variables. See the shell script post-deployment.sh:

stack_name='HiveEMRonEKS'
export VIRTUAL_CLUSTER_ID=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='VirtualClusterId'].OutputValue" --output text)
export EMR_ROLE_ARN=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='EMRExecRoleARN'].OutputValue" --output text)
export S3BUCKET=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='CODEBUCKET'].OutputValue" --output text)

Connect to the demo EKS cluster:

echo `aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?starts_with(OutputKey,'eksclusterEKSConfig')].OutputValue" --output text` | bash
kubectl get svc

Ensure the entryPoint path is correct, then submit the set-of-hive-queries.sql to EMR on EKS.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name sparksql-test \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.8.0-latest \
--job-driver '{
  "sparkSqlJobDriver": {
      "entryPoint": "s3://'$S3BUCKET'/app_code/job/set-of-hive-queries.sql",
      "sparkSqlParameters": "-hivevar S3Bucket='$S3BUCKET' -hivevar Key_ID=238"}}' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.sql.warehouse.dir": "s3://'$S3BUCKET'/warehouse/",
          "spark.hive.metastore.uris": "thrift://hive-metastore:9083"
        }
      }
    ], 
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {"logUri": "s3://'$S3BUCKET'/elasticmapreduce/emr-containers"}}}'

Note that the shell script referenced the set-of-hive-queries.sql Hive script file as an entry point script. It uses the sparkSqlJobDriver attribute, not the usual sparkSubmitJobDriver designed for Spark applications. In the sparkSqlParameters section, we pass in two environment variables S3Bucket and key_ID to the Hive script.

The property "spark.hive.metastore.uris": "thrift://hive-metastore:9083" sets a connection to a Hive Metastore Service (HMS) called hive-metastore, which is running as a Kubernetes service on the demo EKS cluster as shown in the follow screenshot. If you’re running the thrift service on Amazon EMR on EC2, the URI should be thrift://<YOUR_EMR_MASTER_NODE_DNS_NAME>:9083. If you chose AWS Glue Data Catalog as your Hive metastore, replace the entire property with "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory".

Finally, check the job status using the kubectl command line tool: kubectl get po -n emr --watch

Expected output

  1. Go to the Amazon EMR console.
  2. Navigate to the side menu Virtual clusters, then select the HiveDemo cluster, You can see an entry for the SparkSQL test job.
  3. Click Spark UI hyperlink to monitor each query’s duration and status on a web interface.
  4. To query the Amazon RDS based Hive metastore, you need a MYSQL client tool installed. To make it easier, the sample CloudFormation template has installed the query tool on master node of a small Amazon EMR on EC2 cluster.
  5. Find the EMR master node by running the following command:
aws ec2 describe-instances --filter Name=tag:project,Values=$stack_name Name=tag:aws:elasticmapreduce:instance-group-role,Values=MASTER --query 'Reservations[].Instances[].InstanceId[]'

  1. Go to the Amazon EC2 console and connect to the master node through the Session Manager.
  2. Before querying the MySQL RDS database (the Hive metastore), run the following commands on your local machine to get the credentials:
    stack_name='HiveEMRonEKS' 
    export secret_name=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='HiveSecretName'].OutputValue" --output text) 
    export HOST_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.host')
    export PASSWORD=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.password')
    export DB_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.dbname')
    export USER_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.username')
    echo -e "\n host: $HOST_NAME\n DB: $DB_NAME\n passowrd: $PASSWORD\n username: $USER_NAME\n"
    

  3. After connected through Session Manager, query the Hive metastore from your Amazon EMR master node.
    mysql -u admin -P 3306 -p -h <YOUR_HOST_NAME>
    Enter password:<YOUR_PASSWORD>
    
    # Query the metastore
    MySQL[(none)]> Use HiveEMRonEKS;
    MySQL[HiveEMRonEKS]> select * from DBS;
    MySQL[HiveEMRonEKS]> select * from TBLS;
    MySQL[HiveEMRonEKS]> exit();

  4. Validate the Hive tables (created by set-of-hive-queries.sql) through the interactive Hive CLI tool.

Note:-Your query environment must have the Hive Client tool installed and a connection to your Hive metastore or AWS Glue Data Catalog. For the testing purpose, you can connect to the same Amazon EMR on EC2 master node and query your Hive tables. The EMR cluster has been pre-configured with the required setups.

sudo su
hive
hive> show databases;

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script.

curl https://raw.githubusercontent.com/aws-samples/hive-emr-on-eks/main/deployment/app_code/delete_all.sh | bash

Go to the CloudFormation console and manually delete the remaining resources if needed.

Conclusion

Amazon EMR on EKS releases 6.7.0 and higher include a Spark SQL job driver so that you can directly run Spark SQL scripts via the StartJobRun API. Without any modifications to your existing Hive scripts, you can directly execute them as a SparkSQL job on Amazon EMR on EKS.

FINRA is one of the largest Amazon EMR customers. It runs over 400 Hive clusters for its analysts who need to interactively query multi-petabyte data sets. Modernizing its Hive workloads with SparkSQL gives FINRA a 50 to 80 percent query performance improvement. The support to run Spark SQL through the StartJobRun API in EMR on EKS has further enabled FINRA’s innovation in data analytics.

In this post, we demonstrated how to submit a Hive script to Amazon EMR on EKS and run it as a SparkSQL job. We encourage you to give it a try and are keen to hear your feedback and suggestions.


About the authors

Amit Maindola is a Senior Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Orchestrate Amazon EMR Serverless jobs with AWS Step functions

Post Syndicated from Naveen Balaraman original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-jobs-with-aws-step-functions/

Amazon EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. You can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide just the right amount of capacity for your application, and you only pay for what you use.

AWS Step Functions is a serverless orchestration service that enables developers to build visual workflows for applications as a series of event-driven steps. Step Functions ensures that the steps in the serverless workflow are followed reliably, that the information is passed between stages, and errors are handled automatically.

The integration between AWS Step Functions and Amazon EMR Serverless makes it easier to manage and orchestrate big data workflows. Before this integration, you had to manually poll for job statuses or implement waiting mechanisms through API calls. Now, with the support for “Run a Job (.sync)” integration, you can more efficiently manage your EMR Serverless jobs. Using .sync allows your Step Functions workflow to wait for the EMR Serverless job to complete before moving on to the next step, effectively making job execution part of your state machine. Similarly, the “Request Response” pattern can be useful for triggering a job and immediately getting a response back, all within the confines of your Step Functions workflow. This integration simplifies your architecture by eliminating the need for additional steps to monitor job status, making the whole system more efficient and easier to manage.

In this post, we explain how you can orchestrate a PySpark application using Amazon EMR Serverless and AWS Step Functions. We run a Spark job on EMR Serverless that processes Citi Bike dataset data in an Amazon Simple Storage Service (Amazon S3) bucket and stores the aggregated results in Amazon S3.

Solution Overview

We demonstrate this solution with an example using the Citi Bike dataset. This dataset includes numerous parameters such as Rideable type, Start station, Started at, End station, Ended at, and various other elements about Citi Bikers ride. Our objective is to find the minimum, maximum, and average bike trip duration in a given month.

In this solution, the input data is read from the S3 input path, transformations and aggregations are applied with the PySpark code, and the summarized output is written to the S3 output path s3://<bucket-name>/serverlessout/.

The solution is implemented as follows:

  • Creates an EMR Serverless application with Spark runtime. After the application is created, you can submit the data-processing jobs to that application. This API step waits for Application creation to complete.
  • Submits the PySpark job and waits for its completion with the StartJobRun (.sync) API. This allows you to submit a job to an Amazon EMR Serverless application and wait until the job completes.
  • After the PySpark job completes, the summarized output is available in the S3 output directory.
  • If the job encounters an error, the state machine workflow will indicate a failure. You can inspect the specific error within the state machine. For a more detailed analysis, you can also check the EMR job failure logs in the EMR studio console.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account
  • An IAM user with administrator access
  • An S3 bucket

Solution Architecture

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration and Amazon EMR Serverless for data transformations. Summarized output is then written to Amazon S3 bucket.

The following diagram illustrates the architecture for this use case

Deployment steps

Before beginning this tutorial, ensure that the role being used to deploy has all the relevant permissions to create the required resources as part of the solution. The roles with the appropriate permissions will be created through a CloudFormation template using the following steps.

Step 1: Create a Step Functions state machine

You can create a Step Functions State Machine workflow in two ways— either through the code directly or through the Step Functions studio graphical interface. To create a state machine, you can follow the steps from either option 1 or option 2 below.

Option 1: Create the state machine through code directly

To create a Step Functions state machine along with the necessary IAM roles, complete the following steps:

  1. Launch the CloudFormation stack using this link. On the Cloud Formation console, provide a stack name and accept the defaults to create the stack. Once the CloudFormation deployment completes, the following resources are created, in addition EMR Service Linked Role will be automatically created by this CloudFormation stack to access EMR Serverless:
    • S3 bucket to upload the PySpark script and write output data from EMR Serverless job. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.
    • State Machine with EMR Serverless steps.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right corner of AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

Option 2: Create the Step Functions state machine through Workflow Studio

Prerequisites

Before creating the State Machine though Workshop Studio, please ensure that all the relevant roles and resources are created as part of the solution.

  1. To deploy the necessary IAM roles and S3 bucket into your AWS account, launch the CloudFormation stack using this link. Once the CloudFormation deployment completes, the following resources are created:
    • S3 bucket to upload the PySpark script and write output data. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right of the AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

To create a Step Functions state machine, complete the following steps:

  1. On the Step Functions console, choose Create state machine.
  2. Keep the Blank template selected, and click Select.
  3. In the Actions Menu on the left, Step Functions provides a list of AWS services APIs that you can drag and drop into your workflow graph in the design canvas. Type EMR Serverless in the search and drag the Amazon EMR Serverless CreateApplication state to the workflow graph:

  1. In the canvas, select Amazon EMR Serverless CreateApplication state to configure its properties. The Inspector panel on the right shows configuration options. Provide the following Configuration values:
    • Change the State name to Create EMR Serverless Application
    • Provide the following values to the API Parameters. This creates an EMR Serverless Application with Apache Spark based on Amazon EMR release 6.12.0 using default configuration settings.
      {
          "Name": "ServerlessBikeAggr",
          "ReleaseLabel": "emr-6.12.0",
          "Type": "SPARK"
      }

    • Click the Wait for task to complete – optional check box to wait for EMR Serverless Application creation state to complete before executing the next state.
    • Under Next state, select the Add new state option from the drop-down.
  2. Drag EMR Serverless StartJobRun state from the left browser to the next state in the workflow.
    • Rename State name to Submit PySpark Job
    • Provide the following values in the API parameters and click Wait for task to complete – optional (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID).
{
"ApplicationId.$": "$.ApplicationId",
    "ExecutionRoleArn": "arn:aws:iam::<<ACCOUNT-ID>>:role/EMR-Serverless-Role-<<ACCOUNT-ID>>",
    "JobDriver": {
        "SparkSubmit": {
            "EntryPoint": "s3://serverless-<<ACCOUNT-ID>>-blog/scripts/bikeaggregator.py",
            "EntryPointArguments": [
                "s3://serverless-<<ACCOUNT-ID>>-blog/data/",
                "s3://serverless-<<ACCOUNT-ID>>-blog/serverlessout/"
            ],
            "SparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }
}

  1. Select the Config tab for the state machine from the top and change the following configurations:
    • Change State machine name to EMRServerless-BikeAggr found in Details.
    • In the Permissions section, select StateMachine-Role-<<ACCOUNT-ID>> from the dropdown for Execution role. (Make sure that you replace <<ACCOUNT-ID>> with your AWS Account ID).
  2. Continue to add steps for Check Job Success from the studio as shown in the following diagram.

  1. Click Create to create the Step Functions State Machine for orchestrating the EMR Serverless jobs.

Step 2: Invoke the Step Functions

Now that the Step Function is created, we can invoke it by clicking on the Start execution button:

When the step function is being invoked, it presents its run flow as shown in the following screenshot. Because we have selected Wait for task to complete config (.sync API) for this step, the next step would not start wait until EMR Serverless Application is created (blue represents the Amazon EMR Serverless Application being created).

After successfully creating the EMR Serverless Application, we submit a PySpark Job to that Application.

When the EMR Serverless job completes, the Submit PySpark Job step changes to green. This is because we have selected the Wait for task to complete configuration (using the .sync API) for this step.

The EMR Serverless Application ID as well as PySpark Job run Id from Output tab for Submit PySpark Job step.

Step 3: Validation

To confirm the successful completion of the job, navigate to EMR Serverless console and find the EMR Serverless Application Id. Click the Application Id to find the execution details for the PySpark Job run submitted from the Step Functions.

To verify the output of the job execution, you can check the S3 bucket where the output will be stored in a .csv file as shown in the following graphic.

Cleanup

Log in to the AWS Management Console and delete any S3 buckets created by this deployment to avoid unwanted charges to your AWS account. For example: s3://serverless-<<ACCOUNT-ID>>-blog/

Then clean up your environment, delete the CloudFormation template you created in the Solution configuration steps.

Delete Step function you created as part of this solution.

Conclusion

In this post, we explained how to launch an Amazon EMR Serverless Spark job with Step Functions using Workflow Studio to implement a simple ETL pipeline that creates aggregated output from the Citi Bike dataset and generate reports.

We hope this gives you a great starting point for using this solution with your datasets and applying more complex business rules to solve your transient cluster use cases.

Do you have follow-up questions or feedback? Leave a comment. We’d love to hear your thoughts and suggestions.

References


About the Authors

Naveen Balaraman is a Sr Cloud Application Architect at Amazon Web Services. He is passionate about Containers, Serverless, Architecting Microservices and helping customers leverage the power of AWS cloud.

Karthik Prabhakar is a Senior Big Data Solutions Architect for Amazon EMR at AWS. He is an experienced analytics engineer working with AWS customers to provide best practices and technical advice in order to assist their success in their data journey.

Parul Saxena is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where she provides architectural guidance to customers for running complex big data workloads over AWS platform. In her spare time, she enjoys traveling and spending time with her family and friends.

Define per-team resource limits for big data workloads using Amazon EMR Serverless

Post Syndicated from Gaurav Sharma original https://aws.amazon.com/blogs/big-data/define-per-team-resource-limits-for-big-data-workloads-using-amazon-emr-serverless/

Customers face a challenge when distributing cloud resources between different teams running workloads such as development, testing, or production. The resource distribution challenge also occurs when you have different line-of-business users. The objective is not only to ensure sufficient resources be consistently available to production workloads and critical teams, but also to prevent adhoc jobs from using all the resources and delaying other critical workloads due to mis-configured or non-optimized code. Cost controls and usage tracking across these teams is also a critical factor.

In the legacy big data and Hadoop clusters as well as Amazon EMR provisioned clusters, this problem was overcome by Yarn resource management and defining what were called Yarn queues for different workloads or teams. Another approach was to allocate independent clusters for different teams or different workloads.

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it straightforward to run your big data workloads using open-source analytics frameworks such as Apache Spark and Hive without the need to configure, manage, or scale the clusters. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run your workloads. You continue to get the benefits of Amazon EMR, such as open-source compatibility, concurrency, and optimized runtime performance for popular bigdata frameworks. EMR Serverless provides shorter job startup latency, automatic resource management and effective cost controls.

In this post, we show how to define per-team resource limits for big data workloads using EMR serverless.

Solution overview

EMR Serverless comes with a concept called an  EMR Serverless application, which is an isolated environment with the option to choose one of the open source analytics applications(Spark, Hive) to submit your workloads. You can include your own custom libraries, specify your EMR release version, and most importantly define the resource limits for the compute and memory resources. For instance, if your production Spark jobs run on Amazon EMR 6.9.0 and you need to test the same workload on Amazon EMR 6.10.0, you could use EMR Serverless to define EMR 6.10.0 as your version and test your workload using a predefined limit on resources.

The following diagram illustrates our solution architecture. We see that two different teams namely Prod team and Dev team are submitting their jobs independently to two different EMR Applications (namely ProdApp and DevApp respectively ) having dedicated resources.

EMR Serverless provides controls at the account, application and job level to limit the use of resources such as CPU, memory or disk. In the following sections, we discuss some of these controls.

Service quotas at account level

Amazon EMR Serverless has a default quota of 16 for maximum concurrent vCPUs per account. In other words, a new account can have a maximum of 16 vCPUs running at a given point in time in a particular Region across all EMR Serverless applications. However, this quota is auto-adjustable based on the usage patterns, which are monitored at the account and Region levels.

Resource limits and runtime configurations at the application level

In addition to quotas at the account levels, administrators can limit the use of resources at the application level using a feature known as “maximum capacity” which defines the maximum total vCPU, memory and disk capacity that can be consumed collectively by all the jobs running under this application.

You also have an option to specify common runtime and monitoring configurations at the application level which you would otherwise put in the specific job configurations. This helps create a standardized runtime environment for all the jobs running under an application. This can include settings like defining common connection setting your jobs need access to, log configurations that all your jobs will inherit by default, or Spark resource settings to help balance ad-hoc workloads. You can override these configurations at the job level, but defining them at the application can help reduce the configuration necessary for individual jobs.

For further details, refer to Declaring configurations at application level.

Runtime configurations at Job level

After you have set service, application quotas and runtime configurations at application level, you also have an option to override or add new configurations at the job level as well. For example, you can use different Spark job parameters to define how many maximum executors can be run by that specific job. One such parameter is spark.dynamicAllocation.maxExecutors which defines an upper bound for the number of executors in a job and therefore controls the number of workers in an EMR Serverless application because each executor runs within a single worker. This parameter is part of the dynamic allocation feature of Apache Spark, which allows you to dynamically scale the number of executors(workers) registered with the job up and down based on the workload. Dynamic allocation is enabled by default on EMR Serverless. For detailed steps, refer to Declaring configurations at application level.

With these configurations, you can control the resources used across accounts, applications, and jobs. For example, you can create applications with a predefined maximum capacity to constrain costs or configure jobs with resource limits in order to allow multiple ad hoc jobs to run simultaneously without consuming too many resources.

Best practices and considerations

Extending these usage scenarios further, EMR Serverless provides features and capabilities to implement the following design considerations and best practices based on your workload requirements:

  • To make sure that the users or teams submit their jobs only to their approved applications, you could use tag based AWS Identity and Access Management (IAM) policy conditions. For more details, refer to Using tags for access control.
  • You can use custom images as applications belonging to different teams that have distinct use-cases and software requirements. Using custom images is possible EMR 6.9.0 and onwards. Custom images allows you to package various application dependencies into a single container. Some of the important benefits of using custom images include the ability to use your own JDK and Python versions, apply your organization-specific security policies and integrate EMR Serverless into your build, test and deploy pipelines. For more information, refer to Customizing an EMR Serverless image.
  • If you need to estimate how much a Spark job would cost when run on EMR Serverless, you can use the open-source tool EMR Serverless Estimator. This tool analyzes Spark event logs to provide you with the cost estimate. For more details, refer to Amazon EMR Serverless cost estimator
  • We recommend that you determine your maximum capacity relative to the supported worker sizes by multiplying the number of workers by their size. For example, if you want to limit your application with 50 workers to 2 vCPUs, 16 GB of memory and 20 GB of disk, set the maximum capacity to 100 vCPU, 800 GB of memory, and 1000 GB of disk.
  • You can use tags when you create the EMR Serverless application to help search and filter your resources, or track the AWS costs using AWS Cost Explorer. You can also use tags for controlling who can submit jobs to a particular application or modify its configurations. Refer to Tagging your resources for more details.
  • You can configure the pre-initialized capacity at the time of application creation, which keeps the resources ready to be consumed by the time-sensitive jobs you submit.
  • The number of concurrent jobs you can run depends on important factors like maximum capacity limits, workers required for each job, and available IP address if using a VPC.
  • EMR Serverless will setup elastic network interfaces (ENIs) to securely communicate with resources in your VPC. Make sure you have enough IP addresses in your subnet for the job.
  • It’s a best practice to select multiple subnets from multiple Availability Zones. This is because the subnets you select determine the Availability Zones that are available to run the EMR Serverless application. Each worker uses an IP address in the subnet where it is launched. Make sure the configured subnets have enough IP addresses for the number of workers you plan to run.

Resource usage tracking

EMR Serverless not only allows cloud administrators to limit the resources for each application, it also enables them to monitor the applications and track the usage of resources across these applications. For more details, refer to  EMR Serverless usage metrics .

You can also deploy an AWS CloudFormation template to build a sample CloudWatch Dashboard for EMR Serverless which would help visualize various metrics for your applications and jobs. For more information, refer to EMR Serverless CloudWatch Dashboard.

Conclusion

In this post, we discussed how EMR Serverless empowers cloud and data platform administrators to efficiently distribute as well as restrict the cloud resources at different levels, for different organizational units, users and teams, as well as between critical and non-critical workloads. EMR Serverless resource limiting features make sure cloud cost is under control and resource usage is tracked effectively.

For more information on EMR Serverless applications and resource quotas, please refer to EMR Serverless User Guide and Configuring an application.


About the Authors

Gaurav Sharma is a Specialist Solutions Architect(Analytics) at Amazon Web Services (AWS), supporting US public sector customers on their cloud journey. Outside of work, Gaurav enjoys spending time with his family and reading books.

Damon Cortesi is a Principal Developer Advocate with Amazon Web Services. He builds tools and content to help make the lives of data engineers easier. When not hard at work, he still builds data pipelines and splits logs in his spare time.

Query big data with resilience using Trino in Amazon EMR with Amazon EC2 Spot Instances for less cost

Post Syndicated from Ashwini Kumar original https://aws.amazon.com/blogs/big-data/query-big-data-with-resilience-using-trino-in-amazon-emr-with-amazon-ec2-spot-instances-for-less-cost/

Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. Amazon EMR provides a managed Hadoop framework that makes it straightforward, fast, and cost-effective to process vast amounts of data using EC2 instances. Amazon EMR with Spot Instances allows you to reduce costs for running your big data workloads on AWS. Amazon EC2 can interrupt Spot Instances with a 2-minute notification whenever Amazon EC2 needs to reclaim capacity for On-Demand customers. Spot Instances are best suited for running stateless and fault-tolerant big data applications such as Apache Spark with Amazon EMR, which are resilient against Spot node interruptions.

Trino (formerly PrestoSQL) is an open-source, highly parallel, distributed SQL query engine to run interactive queries as well as batch processing on petabytes of data. It can perform in-place, federated queries on data stored in a multitude of data sources, including relational databases (MySQL, PostgreSQL, and others), distributed data stores (Cassandra, MongoDB, Elasticsearch, and others), and Amazon Simple Storage Service (Amazon S3), without the need for complex and expensive processes of copying the data to a single location.

Before Project Tardigrade, Trino queries failed whenever any of the nodes in Trino clusters failed, and there was no automatic retry mechanism with iterative querying capability. Also, failed queries had to be restarted from scratch. Due to this limitation, the cost of failures of long-running extract, transform, and load (ETL) and batch queries on Trino was high in terms of completion time, compute wastage, and spend. Spot Instances were not appropriate for long-running queries with Trino clusters and only suited for short-lived Trino queries.

In October 2022, Amazon EMR announced a new capability in the Trino engine to detect 2-minute Spot interruption notifications and determine if the existing queries can complete within 2 minutes on those nodes. If the queries can’t finish, Trino will fail them quickly and retry the queries on different nodes. Also, Trino doesn’t schedule new queries on these Spot nodes, which are about to be reclaimed. In November 2022, Amazon EMR added support for Project Tardigrade’s fault-tolerant option in the Trino engine with Amazon EMR 6.8 and above. Enabling this feature mitigates Trino task failures caused by worker node failures due to Spot interruptions or On-Demand node stops. Trino now retries failed tasks using intermediate exchange data checkpointed on Amazon S3 or HDFS.

These new enhancements in Trino with Amazon EMR provide improved resiliency for running ETL and batch workloads on Spot Instances with reduced costs. This post showcases the resilience of Amazon EMR with Trino using fault-tolerant configuration to run long-running queries on Spot Instances to save costs. We simulate Spot interruptions on Trino worker nodes by using AWS Fault Injection Simulator (AWS FIS).

Trino architecture overview

Trino runs a query by breaking up the run into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. This pipelined execution model runs multiple stages in parallel and streams data from one stage to another as the data becomes available. This parallel architecture reduces end-to-end latency and makes Trino a fast tool for ad hoc data exploration and ETL jobs over very large datasets. The following diagram illustrates this architecture.

In a Trino cluster, the coordinator is the server responsible for parsing statements, planning queries, and managing workers. The coordinator is also the node to which a client connects and submits statements to run. Every Trino cluster must have at least one coordinator. The coordinator creates a logical model of a query involving a series of stages, which is then translated into a series of connected tasks running on Trino workers. In Amazon EMR, the Trino coordinator runs on the EMR primary node and workers run on core and task nodes.

Faster insights with lower costs with EC2 Spot

You can save significant costs for your ETL and batch workloads running on EMR Trino clusters with a blend of Spot and On-Demand Instances. You can also reduce time-to-insight with faster query runs with lower costs by running more worker nodes on Spot Instances, using the parallel architecture of Trino.

For example, a long-running query on EMR Trino that takes an hour can be finished faster by provisioning more worker nodes on Spot Instances, as shown in the following figure.

Fault-tolerant Trino configuration in Amazon EMR

Fault-tolerant execution in Trino is disabled by default; you can enable it by setting a retry policy in the Amazon EMR configuration. Trino supports two types of retry policies:

  • QUERY – The QUERY retry policy instructs Trino to retry the whole query automatically when an error occurs on a worker node. This policy is only suitable for short-running queries because the whole query is retried from scratch.
  • TASK – The TASK retry policy instructs Trino to retry individual query tasks in the event of failure. This policy is recommended for long-running ETL and batch queries.

With fault-tolerant execution enabled, intermediate exchange data is spooled on an exchange manager so that another worker node can reuse it in the event of a node failure to complete the query run. The exchange manager uses a storage location on Amazon S3 or Hadoop Distributed File System (HDFS) to store and manage spooled data, which is spilled beyond in-memory buffer size of worker nodes. By default, Amazon EMR release 6.9.0 and later uses HDFS as an exchange manager.

Solution overview

In this post, we create an EMR cluster with following architecture.

We provision the following resources using Amazon EMR and AWS FIS:

  • An EMR 6.9.0 cluster with the following configuration:
    • Apache Hadoop, Hue, and Trino applications
    • EMR instance fleets with the following:
      • One primary node (On-Demand) as the Trino coordinator
      • Two core nodes (On-Demand) as the Trino workers and exchange manager
      • Four task nodes (Spot Instances) as Trino workers
    • Trino’s fault-tolerant configuration with following:
      • TPCDS connector
      • The TASK retry policy
      • Exchange manager directory on HDFS
      • Optional recommended settings for query performance optimization
  • An FIS experiment template to target Spot worker nodes in the Trino cluster with interruptions to demonstrate fault-tolerance of EMR Trino with Spot Instances

We use the new Amazon EMR console to create an EMR 6.9.0 cluster. For more information about the new console, refer to Summary of differences.

Create an EMR 6.9.0 cluster

Complete the following steps to create your EMR cluster:

  1. On the Amazon EMR console, create an EMR 6.9.0 cluster named emr-trino-cluster with Hadoop, Hue, and Trino applications using the Custom application bundle.

We need Hue’s web-based interface for submitting SQL queries to the Trino engine and HDFS on core nodes to store intermediate exchange data for Trino’s fault-tolerant runs.

Using multiple Spot capacity pools (each instance type in each Availability Zone is a separate pool) is a best practice to increase your chances of getting large-scale Spot capacity and minimize the impact of a specific instance type being reclaimed in EMR clusters. The Amazon EMR console allows you to configure up to 5 instance types for your core fleet and 15 instance types for your task fleet with the Spot allocation strategy, which allows up to 30 instance types for each fleet from the AWS Command Line Interface (AWS CLI) or Amazon EMR API.

  1. Configure the primary, core, and task fleets with primary and core nodes with On-Demand Instances (m5.xlarge) and task nodes with Spot Instances using multiple instance types.

When you use the Amazon EMR console, the number of vCPUs of the EC2 instance type are used as the count towards the total target capacity of a core or task fleet by default. For example, an m5.xlarge instance type with 4 vCPUs is considered as 4 units of capacity by default.

  1. On the Actions menu under Core or Task fleet, choose Edit weighted capacity.

  1. Because each instance type with 4 vCPUs (xlarge size) is 4 units of capacity, let’s set the cluster size with 8 core units (2 nodes) with On-Demand and 16 task units (4 nodes) with Spot.

Unlike core and task fleets, the primary fleet is always one instance, so no sizing configuration is needed or available for the primary node on the Amazon EMR console.

  1. Select Price-capacity optimized as your Spot allocation strategy, which launches the lowest-priced Spot Instances from your most available pools.

  1. Configure Trino’s fault-tolerant settings in the Software settings section:
[
  {
    "Classification": "trino-connector-tpcds",
    "Properties": {
      "connector.name": "tpcds"
    }
  },
  {
    "Classification": "trino-config",
    "Properties": {
      "exchange.compression-enabled": "true",
      "query.low-memory-killer.delay": "0s",
      "query.remote-task.max-error-duration": "1m",
      "retry-policy": "TASK"
    }
  },
  {
    "Classification": "trino-exchange-manager",
    "Properties": {
      "exchange.base-directories": "/exchange",
      "exchange.use-local-hdfs": "true"
    }
  }
]

Alternatively, you can create a JSON config file with the configuration, store it in an S3 bucket, and select the file path from its S3 location by selecting Load JSON from Amazon S3.

Let’s understand some optional settings for query performance optimization that we have configured:

  • “exchange.compression-enabled”:”true” – This is recommended to enable compression to reduce the amount of data spooled on exchange manager.
  • “query.low-memory-killer.delay”: “0s” – This will reduce the low memory killer delay to allow the Trino engine to unblock nodes running short on memory faster.
  • “query.remote-task.max-error-duration”: “1m” – By default, Trino waits for up to 5 minutes for the task to recover before considering it lost and rescheduling it. This timeout can be reduced for faster retrying of the failed tasks.

For more details of Trino’s fault-tolerant configuration parameters, refer to Fault-tolerant execution.

  1. Let’s also add a tag key called Name with the value MyTrinoCluster to launch EC2 instances with this tag name.

We’ll use this tag to target Spot Instances in the cluster with AWS FIS.

The EMR cluster will take few minutes to be ready in the Waiting state.

Configure an FIS experiment template to target Spot Instances with interruptions in the EMR Trino cluster

We now use the AWS FIS console to simulate interruptions of Spot Instances in the EMR Trino cluster and showcase the fault-tolerance of the Trino engine. Complete the following steps:

  1. On the AWS FIS console, create an experiment template.

  1. Under Actions, choose Add action.
  2. Create an AWS FIS action with Action type as aws:ec2:send-spot-instance-interruptions and Duration Before Interruption as 2 minutes.
  3. Choose Save.

This means FIS will interrupt targeted Spot Instances after 2 minutes of running the experiment.

  1. Under Targets, choose Edit to target all Spot Instances running in the EMR cluster.
  2. For Resource tags, use Name= MyTrinoCluster.
  3. For Resource filters, use as State.Name=running.
  4. For Selection mode, set to ALL.
  5. Choose Save.

  1. Create a new AWS Identity and Access Management (IAM) role automatically to provide permissions to AWS FIS.

  1. Choose Create experiment template.

Launch Hue and Trino web interfaces

When your EMR cluster is in the Waiting state, connect to the Hue web interface for Trino queries and the Trino web interface for monitoring. Alternatively, you can submit your Trino queries using trino-cli after connecting via SSH to your EMR cluster’s primary node. In this post, we will use the Hue web interface for running queries on the EMR Trino engine.

  1. To connect to Hue interface on the primary node from your local computer, navigate to the EMR cluster’s Properties, Network and security, and EC2 security groups (firewall) section.
  2. Edit the primary node security group’s inbound rule to add your IP address and port (port 22).
  3. Retrieve your EMR cluster’s primary node public DNS from your EMR cluster’s Summary tab.

Refer to View web interfaces hosted on Amazon EMR clusters for details on connecting to web interfaces in the primary node from your local computer. You can set up an SSH tunnel with dynamic port forwarding between your local computer and the EMR primary node. Then you can configure proxy settings for your internet browser by using an add-ons such as FoxyProxy for Firefox or SwitchyOmega for Chrome to manage your SOCKS proxy settings.

  1. Connect to Hue by copying the URL (http://<youremrcluster-primary-node-public-dns>:8888/) in your web browser.
  2. Create an account with your choice of user name and password.

After you log in to your account, you can see the query editor on Hue’s web interface.

By default, Amazon EMR configures the Trino web interface on the Trino coordinator (EMR primary node) to use port 8889.

  1. To connect to the Trino web interface, copy the URL (http://<youremrcluster-primary-node-public-dns>:8889/) in your web browser, where you can monitor the Trino cluster and query performance.

In the following screenshot, we can see six active Trino workers (two core and four task nodes of EMR cluster) and no running queries.

  1. Let’s run the Trino query

    select * from system.runtime.nodes from the Hue query editor to see the coordinator and worker nodes’ status and details.

We can see all cluster nodes are in the active state.

Test fault tolerance on Spot interruptions

To test the fault tolerance on Spot interruptions, complete the following steps:

  1. Run the following Trino query using Hue’s query editor:
with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from tpcds.sf100.inventory
,tpcds.sf100.item
,tpcds.sf100.warehouse
,tpcds.sf100.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov ,inv2.d_moy,inv2.mean, inv2.cov

When you go to the Trino web interface, you can see the query running on six active worker nodes (two core On-Demand and four task nodes on Spot Instances).

  1. On the AWS FIS console, choose Experiment templates in the navigation pane.
  2. Select the experiment template EMR_Trino_Interrupter and choose Start experiment.

After a few seconds, the experiment will be in the Completed state and it will trigger stopping all four Spot Instances (four Trino workers) after 2 minutes.

After some time, we can observe in the Trino web UI that we have lost four Trino workers (task nodes running on Spot Instances) but the query is still running with the two remaining On-Demand worker nodes (core nodes). Without the fault-tolerant configuration in EMR Trino, the whole query would fail with even a single worker node failure.

  1. Run the select * from system.runtime.nodes query again in Hue to check the Trino cluster nodes status.

We can see four Spot worker nodes with the status shutting_down.

Trino starts shutting down the four Spot worker nodes as soon as they receive the 2-minute Spot interruption notification sent by the AWS FIS experiment. It will start retrying any failed tasks of these four Spot workers on the remaining active workers (two core nodes) of the cluster. The Trino engine will also not schedule tasks of any new queries on Spot worker nodes in the shutting_down state.

The Trino query will keep running on the remaining two worker nodes and succeed despite the interruption of the four Spot worker nodes. Soon after the Spot nodes stop, Amazon EMR will replenish the stopped capacity (four task nodes) by launching four replacement Spot nodes.

Achieve faster query performance for lower cost with more Trino workers on Spot

Now let’s increase Trino workers capacity from 6 to 10 nodes by manually resizing EMR task nodes on Spot Instances (from 4 to 8 nodes).

We run the same query on a larger cluster with 10 Trino workers. Let’s compare the query completion time (wall time in the Trino Web UI) with the earlier smaller cluster with six workers. We can see 32% faster query performance (1.57 minutes vs. 2.33 minutes).

You can run more Trino workers on Spot Instances to run queries faster to meet your SLAs or process a larger number of queries. With Spot Instances available at discounts up to 90% off On-Demand prices, your cluster costs will not increase significantly vs. running the whole compute capacity on On-Demand Instances.

Clean up

To avoid ongoing charges for resources, navigate to the Amazon EMR console and delete the cluster emr-trino-cluster.

Conclusion

In this post, we showed how you can configure and launch EMR clusters with the Trino engine using its fault-tolerant configuration. With the fault tolerant feature, Trino worker nodes can be run as EMR task nodes on Spot Instances with resilience. You can configure a well-diversified task fleet with multiple instance types using the price-capacity optimized allocation strategy. This will make Amazon EMR request and launch task nodes from the most available, lower-priced Spot capacity pools to minimize costs, interruptions, and capacity challenges. We also demonstrated the resilience of EMR Trino against Spot interruptions using an AWS FIS Spot interruption experiment. EMR Trino continues to run queries by retrying failed tasks on remaining available worker nodes in the event of any Spot node interruption. With fault-tolerant EMR Trino and Spot Instances, you can run big data queries with resilience, while saving costs. For your SLA-driven workloads, you can also add more compute on Spot to adhere to or exceed your SLAs for faster query performance with lower costs compared to On-Demand Instances.


About the Authors

Ashwini Kumar is a Senior Specialist Solutions Architect at AWS based in Delhi, India. Ashwini has more than 18 years of industry experience in systems integration, architecture, and software design, with more recent experience in cloud architecture, DevOps, containers, and big data engineering. He helps customers optimize their cloud spend, minimize compute waste, and improve performance at scale on AWS. He focuses on architectural best practices for various workloads with services including EC2 Spot, AWS Graviton, EC2 Auto Scaling, Amazon EKS, Amazon ECS, and AWS Fargate.

Dipayan Sarkar is a Specialist Solutions Architect for Analytics at AWS, where he helps customers modernize their data platform using AWS Analytics services. He works with customers to design and build analytics solutions, enabling businesses to make data-driven decisions.

Apache Iceberg optimization: Solving the small files problem in Amazon EMR

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/apache-iceberg-optimization-solving-the-small-files-problem-in-amazon-emr/

In our previous post Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes, we discussed how you can implement solutions to improve operational efficiencies of your Amazon Simple Storage Service (Amazon S3) data lake that is using the Apache Iceberg open table format and running on the Amazon EMR big data platform. Iceberg tables store metadata in manifest files. As the number of data files increase, the amount of metadata stored in these manifest files also increases, leading to longer query planning time. The query runtime also increases because it’s proportional to the number of data or metadata file read operations. Compaction is the process of combining these small data and metadata files to improve performance and reduce cost. Compaction also gets rid of deleting files by applying deletes and rewriting a new file without deleting records. Currently, Iceberg provides a compaction utility that compacts small files at a table or partition level. But this approach requires you to implement the compaction job using your preferred job scheduler or manually triggering the compaction job.

In this post, we discuss the new Iceberg feature that you can use to automatically compact small files while writing data into Iceberg tables using Spark on Amazon EMR or Amazon Athena.

Use cases for processing small files

Streaming applications are prone to creating a large number of small files, which can negatively impact the performance of subsequent processing times. For example, consider a critical Internet of Things (IoT) sensor from a cold storage facility that is continuously sending temperature and health data into an S3 data lake for downstream data processing and triggering actions like emergency maintenance. Systems of this nature generate a huge number of small objects and need attention to compact them to a more optimal size for faster reading, such as 128 MB, 256 MB, or 512 MB. In this post, we show you a streaming sensor data use case with a large number of small files and the mitigation steps using the Iceberg open table format. For more information on streaming applications on AWS, refer to Real-time Data Streaming and Analytics.

Streaming Architecture

Solution overview

To compact the small files for improved performance, in this example, Amazon EMR triggers a compaction job after the write commit as a post-commit hook when defined thresholds (for example, number of commits) are met. By default, Amazon EMR waits for 10 commits to trigger the post-commit hook compaction utility.

This Iceberg event-based table management feature lets you monitor table activities during writes to make better decisions about how to manage each table differently based on events. As of this writing, only the optimize-data optimization is supported. To learn more about the available optimize data executors and catalog properties, refer to the README file in the GitHub repo.

To use the feature, you can use the iceberg-aws-event-based-table-management source code and provide the built JAR in the engine’s class-path. The following bootstrap action can place the JAR in the engine’s class-path:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Note that the Iceberg AWS event-based table management feature works with Iceberg v1.2.0 and above (available from Amazon EMR 6.11.0).

In some use cases, you may want to run the event-based compaction jobs in a different EMR cluster in order to avoid any impact to the ETL jobs running in their current EMR cluster. You can get the metadata, including the cluster ID of your current ETL workflows, from the /mnt/var/lib/info/job-flow.json file and then use a different cluster to process the event-based compactions.

The notebook examples shown in the following sections are also available in the aws-samples GitHub repo.

Prerequisite

For this performance comparison exercise between a Spark external table and an Iceberg table and Iceberg with compaction, we generate a significant number of small files in Parquet format and store them in an S3 bucket. We used the Amazon Kinesis Data Generator (KDG) tool to generate sample sensor data information using the following template:

{"sensorId": {{random.number(5000)}},
 "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
  )}},
 "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}",
 "date_ts": "{{date.now("YYYY-MM-DD HH:mm:ss")}}"
}

We configured an Amazon Kinesis Data Firehose delivery stream and sent the generated data into a staging S3 bucket. Then we ran an AWS Glue extract, transform, and load (ETL) job to convert the JSON files into Parquet format. For our testing, we generated about 58,176 small objects with total size of 2 GB.

For running the Amazon EMR tests, we used Amazon EMR version emr-6.11.0 with Spark 3.3.2, and JupyterEnterpriseGateway 2.6.0. The cluster used had one primary node (r5.2xlarge) and two core nodes (r5.xlarge). We used a bootstrap action during cluster creation to enable event-based table management:

sudo aws s3 cp s3://<path>/iceberg-aws-event-based-table-management-0.1.jar /usr/lib/spark/jars/

Also, refer to our guidance on how to use an Iceberg cluster with Spark, which is a prerequisite for this exercise.

As part of the exercise, we see new steps are being added to the EMR cluster to trigger the compaction jobs. To enable adding new steps to the running cluster, we add the elasticmapreduce:AddJobFlowSteps action to the cluster’s default role, EMR_EC2_DefaultRole, as a prerequisite.

Performance of Iceberg reads with the compaction utility on Amazon EMR

In the following steps, we demonstrate how to use the compaction utility and what performance benefits you can achieve. We use an EMR notebook to demonstrate the benefits of the compaction utility. For instructions to set up an EMR notebook, refer to Amazon EMR Studio overview.

First, you configure your Spark session using the %%configure magic command. We use the Hive catalog for Iceberg tables.

  1. Before you run the following step, create an Amazon S3 bucket in your AWS account called <your-iceberg-storage-blog>. To check how to create an Amazon S3 bucket, follow the instructions given here. Update the your-iceberg-storage-blog bucket name in the following configuration with the actual bucket name you created to test this example:
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/"
        }
    }

  2. Create a new database for the Iceberg table in the AWS Glue Data Catalog named DB and provide the S3 URI specified in the Spark config as s3://<your-iceberg-storage-blog>/iceberg/db. Also, create another Database named iceberg_db in Glue for the parquet tables. Follow the instructions given in Working with databases on the AWS Glue console to create your Glue databases. Then create a new Spark table in Parquet format pointing to the bucket containing small objects in your AWS account. See the following code:
    spark.sql(""" CREATE TABLE iceberg_db.sensor_data_parquet_table (
        sensorid int,
        currenttemperature int,
        status string,
        date_ts timestamp)
    USING parquet
    location 's3://<your-bucket-with-parquet-files>/'
    """)

  3. Run an aggregate SQL to measure the performance of Spark SQL on the Parquet table with 58,176 small objects:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from iceberg_db.sensor_data_parquet_table
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

In the following steps, we create a new Iceberg table from the Spark/Parquet table using CTAS (Create Table As Select). Then we show how the automated compaction job can help improve query performance.

  1. Create a new Iceberg table using CTAS from the earlier AWS Glue table with the small files:
    spark.sql(""" CREATE TABLE dev.db.sensor_data_iceberg_format USING iceberg AS (SELECT * FROM iceberg_db.sensor_data_parquet_table)""")

  2. Validate that a new Iceberg snapshot was created for the new table:
    spark.sql(""" Select * from dev.db.sensor_data_iceberg_format.snapshots limit 5""").show()

We have confirmed that our S3 folder corresponds to the newly created Iceberg table. It shows that during the CTAS statement, it added 1,879 objects in the new folder with a total size of 1.3 GB. We can conclude that Iceberg did some optimization while loading data from the Parquet table.

  1. Now that you have data in the Iceberg table, run the previous aggregation SQL to check the runtime:
    spark.sql(""" select maxtemp, mintemp, avgtemp from
    (select
    max(currenttemperature) as maxtemp,
    min(currenttemperature) as mintemp,
    avg(currenttemperature) as avgtemp
    from dev.db.sensor_data_iceberg_format
    where month(date_ts) between 2 and 10
    order by maxtemp, mintemp, avgtemp)""").show()

The runtime for the preceding query ran on the Iceberg table with 1,879 objects in 1 minute, 39 seconds. There is already some significant performance improvement by converting the external Parquet table to an Iceberg table.

  1. Now let’s add the configurations needed to apply the automatic compaction of small files in the Iceberg tables. Note the last four newly added configurations in the following statement. The parameter optimize-data.commit-threshold suggests that the compaction will take place after the first successful commit. The default is 10 successful commits to trigger the compaction.
    %%configure -f
    {
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/",
        "spark.sql.catalog.dev.metrics-reporter-impl":"org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator",
        "spark.sql.catalog.dev.optimize-data.impl":"org.apache.iceberg.aws.manage.EmrOnEc2OptimizeDataExecutor",
        "spark.sql.catalog.dev.optimize-data.emr.cluster-id":"j-1N8J5NZI0KEU3",
        "spark.sql.catalog.dev.optimize-data.commit-threshold":"1"
        }
    }

  2. Run a quick sanity check to confirm that the configurations are working fine with Spark SQL.

  1. 10. To activate the automatic compaction process, add a new record to the existing Iceberg table using a Spark insert:
    spark.sql(""" Insert into dev.db.sensor_data_iceberg_format values(999123, 86, 'PASS', timestamp'2023-07-26 12:50:25') """)

  2. Navigate to the Amazon EMR console to check the cluster steps.

You should see a new step added that goes from Pending to Running and finally the Completed state. Every time the data in the Iceberg table is updated or inserted, based on configuration optimize-data.commit-threshold, the optimize job will automatically trigger to compact the underlying data.

  1. Validate that the record insert was successful.

  1. Check the snapshot table to see that a new snapshot is created for the table with the operation replace.

For every successful run of the background optimize job, a new entry will be added to the snapshot table.

  1. On the Amazon S3 console, navigate to the folder corresponding to the Iceberg table and see that the data files are compacted.

In our case, it was compacted from the previous smaller sizes to approximately 437 MB. The folder will still contain the previous smaller files for time travel unless you issue an expire snapshot command to remove them.

  1. Now you can run the same aggregate query and record the performance after the compaction.

Summary of Amazon EMR testing

The runtime for the preceding aggregation query on the compacted Iceberg table reduced to approximately 59 seconds from the previous runtime of 1 minute, 39 seconds. That is about a 40% improvement. The more small files you have in your source bucket, the bigger performance boost you can achieve with this post-hook compaction implementation. The examples shown in this blog were executed in a small Amazon EMR cluster with only two core nodes (r5.xlarge). To improve the performance of your Spark applications, Amazon EMR provides multiple optimization features that you can implement for your production workloads.

Performance of Iceberg reads with the compaction utility on Athena

To manage the Iceberg table based on events, you can start the Spark 3.3 SQL shell as shown in the following code. Make sure that the athena:StartQueryExecution and athena:GetQueryExecution permission policies are enabled.

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
          --conf spark.sql.catalog.my_catalog.warehouse=<s3-bucket> \
          --conf spark.sql.catalog.my_catalog.metrics-reporter-impl=org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator \
          --conf spark.sql.catalog.my_catalog.optimize-data.impl=org.apache.iceberg.aws.manage.AthenaOptimizeDataExecutor \
          --conf spark.sql.catalog.my_catalog.optimize-data.athena.output-bucket=<s3-bucket>

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

In this post, we showed how Iceberg event-based table management lets you manage each table differently based on events and compact small files to boost application performance. This event-based process significantly reduces the operational overhead of using the Iceberg rewrite_data_files procedure, which needs manual or scheduled operation.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Capacity Management and Amazon EMR Managed Scaling improvements for Amazon EMR on EC2 clusters

Post Syndicated from Sushant Majithia original https://aws.amazon.com/blogs/big-data/capacity-management-and-amazon-emr-managed-scaling-improvements-for-amazon-emr-on-ec2-clusters/

In 2022, we told you about the new enhancements we made in Amazon EMR Managed Scaling, which helped improve cluster utilization as well as reduced cluster costs. In 2023, we are happy to report that the Amazon EMR team has been hard at work. We worked backward from customer requirements and launched multiple new features to enhance your Amazon EMR on EC2 clusters capacity management and scaling experience.

Amazon EMR is the cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Customers asked us for features that would further improve the capacity management and scaling experience of their EMR on EC2 clusters, including their large, long-running clusters. We have been hard at work to meet those needs. The following are some of the key enhancements:

  • Enhanced customer transparency and flexibility with provisioning timeout for Spot Instances
  • Optimized task nodes scale-up for Amazon EMR on EC2 clusters launched with instance groups
  • Improved job resiliency with enhanced protection for Spark Drivers

Let’s dive deeper and discuss the new Amazon EMR on EC2 features in detail.

Enhanced customer transparency and flexibility with provisioning timeout for Spot Instances

Many Amazon EMR customers use EC2 Spot Instances for their EMR on EC2 clusters to reduce costs. Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity offered at discounts of up to 90% compared to On-Demand pricing. Amazon EMR offers you the capability to scale your cluster either manually or by using Automatic Scaling. You can also use the Amazon EMR Managed Scaling feature to automatically resize your cluster based on workload and utilization.

To enhance the customer experience when scaling up using Spot Instances, for EMR on EC2 clusters launched using instance fleets, you can now specify a provisioning timeout for Spot Instances. A provisioning timeout will tell Amazon EMR to stop provisioning Spot Instance capacity if the cluster exceeds a specified time threshold during cluster scaling operations. You can configure the Spot instance provisioning timeout for clusters getting resized manually or using Amazon EMR Managed Scaling and Auto Scaling.

Additionally, to provide better transparency, when the timeout period expires, Amazon EMR will also automatically send events to an Amazon CloudWatch Events stream. With these CloudWatch events, you can create rules that match events according to a specified pattern, and then route the events to targets to take action. To learn more, please refer to Customize a provisioning timeout period for cluster resize in Amazon EMR.

Please find summarized below the experience for different scenario’s when you configure a provisioning timeout period during resize for your Amazon EMR on EC2 cluster

Scenario Experience
Amazon EMR is able to provision the desired Spot capacity before expiration of the provisioning timeout Amazon EMR automatically scales-up the cluster to the desired capacity and no action is needed from the customer
Amazon EMR is not able to provision any Spot capacity or only able to provision partial Spot capacity and the provisioning timeout has expired If Amazon EMR can’t provision the required Spot capacity and the provisioning timeout has expired, Amazon EMR will cancel the resize request and stops it’s attempts to provision additional Spot capacity. Amazon EMR will also publish events to an Amazon CloudWatch Events stream. Customers can use these events to create rules and take appropriate actions
If the Spot instances in your Amazon EMR on EC2 clusters are interrupted as Amazon EC2 needs them back Amazon EMR will automatically trigger a new resize request to rebalance your clusters by replacing instances with any of the available types in your cluster. Amazon EMR will also use the same provisioning resize timeout which was configured on the cluster. No action is needed from the customer.

You should consider the criticality of capacity availability when specifying the provisioning timeout value:

  • When your workload capacity availability is critical To ensure the desired capacity is available, we recommend configuring the resize provisioning timeout based on the time it takes to run the application and application SLAs. For example, if application SLA is 60 minutes and it takes 30 minutes for the application to complete, you should set the resize provisioning timeout to 30 minutes or less. Amazon EMR will try to provision to get Spot capacity until the timeout expires (30 minutes or less) and publish a CloudWatch event so that you can take appropriate actions.
  • When your workload is time flexible and capacity availability is not a factor If the workload is time flexible and capacity availability is not a factor, to ensure the highest likelihood for getting the desired Spot capacity, you can configure a higher timeout value for the resize provisioning timeout.

Optimized task nodes scale-up for Amazon EMR on EC2 clusters launched with Instance groups

Instance groups offer a simpler setup to launch EMR on EC2 clusters. Each cluster launched using instance groups can include up to 50 instance groups: one primary instance group that contains one EC2 instance, a core instance group that contains one or more EC2 instances, and up to 48 optional task instance groups. You can scale each instance group by adding and removing EC2 instances manually, or you can set up automatic scaling. You can also use the Amazon EMR Managed Scaling feature to automatically resize your cluster based on workload and utilization.

To enhance the customer experience for instance groups on EMR on EC2 clusters when scaling up task nodes using Amazon EMR Managed Scaling, we have enhanced the managed scaling algorithm to choose the task instance groups that have the highest likelihood of acquiring capacity. Furthermore, when managed scaling is not able to acquire capacity with a single task instance group, to reduce any scale-up delays, Amazon EMR will automatically switch to another task group and fulfill the capacity by using multiple task instance groups. Consequently, the more flexible you are about your instance types, the higher the chances of provisioning capacity. To learn more, refer to Best practices for instance and Availability Zone flexibility.

Improved job resiliency with enhanced protection for Spark Drivers

In 2022, to improve the job resiliency when using Amazon EMR Managed Scaling, we enhanced managed scaling to be Spark shuffle data aware, which prevents scale-down of instances that store intermediate shuffle data for Apache Spark. This helps prevents job reattempts and recomputations, which leads to better performance and lower cost.

To further improve job resiliency when using Amazon EMR Managed Scaling, we have further enhanced managed scaling to be Spark Driver aware, which ensures that during cluster scale-down, Amazon EMR Managed Scaling prioritizes the scale-down of nodes that don’t have an active Spark Driver running on them. This helps minimize job failures and job retries, helping further improve performance and reduce costs. This enhancement is enabled by default for EMR clusters using Amazon EMR versions 5.34.0 and later, and Amazon EMR versions 6.4.0 and later.

To confirm which nodes in your cluster are running Spark Driver, you can visit the Spark History Server and filter for the driver on the Executors tab of your Spark application ID.

Conclusion

In this post, we highlighted the improvements that we made in capacity management and Amazon EMR Managed Scaling for EMR on EC2 clusters. We focused on improving job resiliency, enhanced flexibility and transparency when provisioning Spot Instances, and optimizing the scale-up experience when using managed scaling with instance groups on Amazon EMR on EC2 clusters. Although we have launched multiple features so far in 2023 and the pace of innovation continues to accelerate, it remains day 1 and we look forward to hearing from you on how these features help you unlock more value for your organizations. We invite you to try these new features and get in touch with us through your AWS account team if you have further comments.


About the authors

Sushant Majithia is a Principal Product Manager for EMR at AWS.

Ankur Goyal is a SDM with Amazon EMR Big Data Platform team. He builds large scale distributed applications and cluster optimization algorithms. Ankur is interested in topics of Analytics, Machine Learning and Forecasting.

Matthew Liem is a Senior Solution Architecture Manager at AWS.

Tarun Chanana is an SDM with Amazon EMR Big Data Platform team.

Monitor Apache Spark applications on Amazon EMR with Amazon Cloudwatch

Post Syndicated from Le Clue Lubbe original https://aws.amazon.com/blogs/big-data/monitor-apache-spark-applications-on-amazon-emr-with-amazon-cloudwatch/

To improve a Spark application’s efficiency, it’s essential to monitor its performance and behavior. In this post, we demonstrate how to publish detailed Spark metrics from Amazon EMR to Amazon CloudWatch. This will give you the ability to identify bottlenecks while optimizing resource utilization.

CloudWatch provides a robust, scalable, and cost-effective monitoring solution for AWS resources and applications, with powerful customization options and seamless integration with other AWS services. By default, Amazon EMR sends basic metrics to CloudWatch to track the activity and health of a cluster. Spark’s configurable metrics system allows metrics to be collected in a variety of sinks, including HTTP, JMX, and CSV files, but additional configuration is required to enable Spark to publish metrics to CloudWatch.

Solution overview

This solution includes Spark configuration to send metrics to a custom sink. The custom sink collects only the metrics defined in a Metricfilter.json file. It utilizes the CloudWatch agent to publish the metrics to a custom Cloudwatch namespace. The bootstrap action script included is responsible for installing and configuring the CloudWatch agent and the metric library on the Amazon Elastic Compute Cloud (Amazon EC2) EMR instances. A CloudWatch dashboard can provide instant insight into the performance of an application.

The following diagram illustrates the solution architecture and workflow.

architectural diagram illustrating the solution overview

The workflow includes the following steps:

  1. Users start a Spark EMR job, creating a step on the EMR cluster. With Apache Spark, the workload is distributed across the different nodes of the EMR cluster.
  2. In each node (EC2 instance) of the cluster, a Spark library captures and pushes metric data to a CloudWatch agent, which aggregates the metric data before pushing them to CloudWatch every 30 seconds.
  3. Users can view the metrics accessing the custom namespace on the CloudWatch console.

We provide an AWS CloudFormation template in this post as a general guide. The template demonstrates how to configure a CloudWatch agent on Amazon EMR to push Spark metrics to CloudWatch. You can review and customize it as needed to include your Amazon EMR security configurations. As a best practice, we recommend including your Amazon EMR security configurations in the template to encrypt data in transit.

You should also be aware that some of the resources deployed by this stack incur costs when they remain in use. Additionally, EMR metrics don’t incur CloudWatch costs. However, custom metrics incur charges based on CloudWatch metrics pricing. For more information, see Amazon CloudWatch Pricing.

In the next sections, we go through the following steps:

  1. Create and upload the metrics library, installation script, and filter definition to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use the CloudFormation template to create the following resources:
  3. Monitor the Spark metrics on the CloudWatch console.

Prerequisites

This post assumes that you have the following:

  • An AWS account.
  • An S3 bucket for storing the bootstrap script, library, and metric filter definition.
  • A VPC created in Amazon Virtual Private Cloud (Amazon VPC), where your EMR cluster will be launched.
  • Default IAM service roles for Amazon EMR permissions to AWS services and resources. You can create these roles with the aws emr create-default-roles command in the AWS Command Line Interface (AWS CLI).
  • An optional EC2 key pair, if you plan to connect to your cluster through SSH rather than Session Manager, a capability of AWS Systems Manager.

Define the required metrics

To avoid sending unnecessary data to CloudWatch, our solution implements a metric filter. Review the Spark documentation to get acquainted with the namespaces and their associated metrics. Determine which metrics are relevant to your specific application and performance goals. Different applications may require different metrics to monitor, depending on the workload, data processing requirements, and optimization objectives. The metric names you’d like to monitor should be defined in the Metricfilter.json file, along with their associated namespaces.

We have created an example Metricfilter.json definition, which includes capturing metrics related to data I/O, garbage collection, memory and CPU pressure, and Spark job, stage, and task metrics.

Note that certain metrics are not available in all Spark release versions (for example, appStatus was introduced in Spark 3.0).

Create and upload the required files to an S3 bucket

For more information, see Uploading objects and Installing and running the CloudWatch agent on your servers.

To create and the upload the bootstrap script, complete the following steps:

  1. On the Amazon S3 console, choose your S3 bucket.
  2. On the Objects tab, choose Upload.
  3. Choose Add files, then choose the Metricfilter.json, installer.sh, and examplejob.sh files.
  4. Additionally, upload the emr-custom-cw-sink-0.0.1.jar metrics library file that corresponds to the Amazon EMR release version you will be using:
    1. EMR-6.x.x
    2. EMR-5.x.x
  5. Choose Upload, and take note of the S3 URIs for the files.

Provision resources with the CloudFormation template

Choose Launch Stack to launch a CloudFormation stack in your account and deploy the template:

launch stack 1

This template creates an IAM role, IAM instance profile, EMR cluster, and CloudWatch dashboard. The cluster starts a basic Spark example application. You will be billed for the AWS resources used if you create a stack from this template.

The CloudFormation wizard will ask you to modify or provide these parameters:

  • InstanceType – The type of instance for all instance groups. The default is m5.2xlarge.
  • InstanceCountCore – The number of instances in the core instance group. The default is 4.
  • EMRReleaseLabel – The Amazon EMR release label you want to use. The default is emr-6.9.0.
  • BootstrapScriptPath – The S3 path of the installer.sh installation bootstrap script that you copied earlier.
  • MetricFilterPath – The S3 path of your Metricfilter.json definition that you copied earlier.
  • MetricsLibraryPath – The S3 path of your CloudWatch emr-custom-cw-sink-0.0.1.jar library that you copied earlier.
  • CloudWatchNamespace – The name of the custom CloudWatch namespace to be used.
  • SparkDemoApplicationPath – The S3 path of your examplejob.sh script that you copied earlier.
  • Subnet – The EC2 subnet where the cluster launches. You must provide this parameter.
  • EC2KeyPairName – An optional EC2 key pair for connecting to cluster nodes, as an alternative to Session Manager.

View the metrics

After the CloudFormation stack deploys successfully, the example job starts automatically and takes approximately 15 minutes to complete. On the CloudWatch console, choose Dashboards in the navigation pane. Then filter the list by the prefix SparkMonitoring.

The example dashboard includes information on the cluster and an overview of the Spark jobs, stages, and tasks. Metrics are also available under a custom namespace starting with EMRCustomSparkCloudWatchSink.

CloudWatch dashboard summary section

Memory, CPU, I/O, and additional task distribution metrics are also included.

CloudWatch dashboard executors

Finally, detailed Java garbage collection metrics are available per executor.

CloudWatch dashboard garbage-collection

Clean up

To avoid future charges in your account, delete the resources you created in this walkthrough. The EMR cluster will incur charges as long as the cluster is active, so stop it when you’re done. Complete the following steps:

  1. On the CloudFormation console, in the navigation pane, choose Stacks.
  2. Choose the stack you launched (EMR-CloudWatch-Demo), then choose Delete.
  3. Empty the S3 bucket you created.
  4. Delete the S3 bucket you created.

Conclusion

Now that you have completed the steps in this walkthrough, the CloudWatch agent is running on your cluster hosts and configured to push Spark metrics to CloudWatch. With this feature, you can effectively monitor the health and performance of your Spark jobs running on Amazon EMR, detecting critical issues in real time and identifying root causes quickly.

You can package and deploy this solution through a CloudFormation template like this example template, which creates the IAM instance profile role, CloudWatch dashboard, and EMR cluster. The source code for the library is available on GitHub for customization.

To take this further, consider using these metrics in CloudWatch alarms. You could collect them with other alarms into a composite alarm or configure alarm actions such as sending Amazon Simple Notification Service (Amazon SNS) notifications to trigger event-driven processes such as AWS Lambda functions.


About the Author

author portraitLe Clue Lubbe is a Principal Engineer at AWS. He works with our largest enterprise customers to solve some of their most complex technical problems. He drives broad solutions through innovation to impact and improve the life of our customers.

AWS Weekly Roundup – Amazon MWAA, EMR Studio, Generative AI, and More – August 14, 2023

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-amazon-mwaa-emr-studio-generative-ai-and-more-august-14-2023/

While I enjoyed a few days off in California to get a dose of vitamin sea, a lot has happened in the AWS universe. Let’s take a look together!

Last Week’s Launches
Here are some launches that got my attention:

Amazon MWAA now supports Apache Airflow version 2.6Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate end-to-end data pipelines in the cloud. Apache Airflow version 2.6 introduces important security updates and bug fixes that enhance the security and reliability of your workflows. If you’re currently running Apache Airflow version 2.x, you can now seamlessly upgrade to version 2.6.3. Check out this AWS Big Data Blog post to learn more.

Amazon EMR Studio adds support for AWS Lake Formation fine-grained access controlAmazon EMR Studio is a web-based integrated development environment (IDE) for fully managed Jupyter notebooks that run on Amazon EMR clusters. When you connect to EMR clusters from EMR Studio workspaces, you can now choose the AWS Identity and Access Management (IAM) role that you want to connect with. Apache Spark interactive notebooks will access only the data and resources permitted by policies attached to this runtime IAM role. When data is accessed from data lakes managed with AWS Lake Formation, you can enforce table and column-level access using policies attached to this runtime role. For more details, have a look at the Amazon EMR documentation.

AWS Security Hub launches 12 new security controls AWS Security Hub is a cloud security posture management (CSPM) service that performs security best practice checks, aggregates alerts, and enables automated remediation. With the newly released controls, Security Hub now supports three additional AWS services: Amazon Athena, Amazon DocumentDB (with MongoDB compatibility), and Amazon Neptune. Security Hub has also added an additional control against Amazon Relational Database Service (Amazon RDS). AWS Security Hub now offers 276 controls. You can find more information in the AWS Security Hub documentation.

Additional AWS services available in the AWS Israel (Tel Aviv) Region – The AWS Israel (Tel Aviv) Region opened on August 1, 2023. This past week, AWS Service Catalog, Amazon SageMaker, Amazon EFS, and Amazon Kinesis Data Analytics were added to the list of available services in the Israel (Tel Aviv) Region. Check the AWS Regional Services List for the most up-to-date availability information.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional blog posts and news items that you might find interesting:

AWS recognized as a Leader in 2023 Gartner Magic Quadrant for Contact Center as a Service with Amazon Connect – AWS was named a Leader for the first time since Amazon Connect, our flexible, AI-powered cloud contact center, was launched in 2017. Read the full story here. 

Generate creative advertising using generative AI –  This AWS Machine Learning Blog post shows how to generate captivating and innovative advertisements at scale using generative AI. It discusses the technique of inpainting and how to seamlessly create image backgrounds, visually stunning and engaging content, and reducing unwanted image artifacts.

AWS open-source news and updates – My colleague Ricardo writes this weekly open-source newsletter in which he highlights new open-source projects, tools, and demos from the AWS Community.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

Build On AWS - Generative AIBuild On Generative AI – Your favorite weekly Twitch show about all things generative AI is back for season 2 today! Every Monday, 9:00 US PT, my colleagues Emily and Darko look at new technical and scientific patterns on AWS, inviting guest speakers to demo their work and show us how they built something new to improve the state of generative AI.

In today’s episode, Emily and Darko discussed the latest models LlaMa-2 and Falcon, and explored them in retrieval-augmented generation design patterns. You can watch the video here. Check out show notes and the full list of episodes on community.aws.

AWS NLP Conference 2023 – Join this in-person event on September 13–14 in London to hear about the latest trends, ground-breaking research, and innovative applications that leverage natural language processing (NLP) capabilities on AWS. This year, the conference will primarily focus on large language models (LLMs), as they form the backbone of many generative AI applications and use cases. Register here.

AWS Global Summits – The 2023 AWS Summits season is almost coming to an end with the last two in-person events in Mexico City (August 30) and Johannesburg (September 26).

AWS Community Days – Join a community-led conference run by AWS user group leaders in your region: West Africa (August 19), Taiwan (August 26), Aotearoa (September 6), Lebanon (September 9), and Munich (September 14).

AWS re:Invent 2023AWS re:Invent (November 27 – December 1) – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Registration is now open.

You can browse all upcoming in-person and virtual events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

— Antje

P.S. We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take this quick survey to share insights on your experience with the AWS Blog. Note that this survey is hosted by an external company, so the link doesn’t lead to our website. AWS handles your information as described in the AWS Privacy Notice.

Improved scalability and resiliency for Amazon EMR on EC2 clusters

Post Syndicated from Ravi Kumar Singh original https://aws.amazon.com/blogs/big-data/improved-scalability-and-resiliency-for-amazon-emr-on-ec2-clusters/

Amazon EMR is the cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Customers asked us for features that would further improve the resiliency and scalability of their Amazon EMR on EC2 clusters, including their large, long-running clusters. We have been hard at work to meet those needs. Over the past 12 months, we have worked backward from customer requirements and launched over 30 new features that improve the resiliency and scalability of your Amazon EMR on EC2 clusters. This post covers some of these key enhancements across three main areas:

  • Improved cluster utilization with optimized scaling experience
  • Minimized interruptions with enhanced resiliency and availability
  • Improved cluster resiliency with upgraded logging and debugging capabilities

Let’s dive into each of these areas.

Improved cluster utilization with optimized scaling experience

Customers use Amazon EMR to run diverse analytics workloads with varying SLAs, ranging from near-real-time streaming jobs to exploratory interactive workloads and everything in between. To cater to these dynamic workloads, you can resize your clusters either manually or by enabling automatic scaling. You can also use the Amazon EMR managed scaling feature to automatically resize your clusters for optimal performance at the lowest possible cost. To ensure swift cluster resizes, we implemented multiple improvements that are available in the latest Amazon EMR releases:

  • Enhanced resiliency of cluster scaling workflow to EC2 Spot Instance interruptions – Many Amazon EMR customers use EC2 Spot Instances for their Amazon EMR on EC2 clusters to reduce costs. Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity offered at discounts of up to 90% compared to On-Demand pricing. However, Amazon EC2 can reclaim Spot capacity with a two-minute warning, which can lead to interruptions in workload. We identified an issue where the cluster’s scaling operation gets stuck when over a hundred core nodes launched on Spot Instances are reclaimed by Amazon EC2 throughout the life of the cluster. Starting with Amazon EMR version 6.8.0, we mitigated this issue by fixing a gap in the process HDFS uses to decommission nodes that caused the scaling operations to get stuck. We contributed this improvement back to the open-source community, enabling seamless recovery and efficient scaling in the event of Spot interruptions.
  • Improve cluster utilization by recommissioning recently decommissioned nodes for Spark workloads within seconds – Amazon EMR allows you to scale down your cluster without affecting your workload by gracefully decommissioning core and task nodes. Furthermore, to prevent task failures, Apache Spark ensures that decommissioning nodes are not assigned any new tasks. However, if a new job is submitted immediately before these nodes are fully decommissioned, Amazon EMR will trigger a scale-up operation for the cluster. This results in these decommissioning nodes to be immediately recommissioned and added back into the cluster. Due to a gap in Apache Spark’s recommissioning logic, these recommissioned nodes would not accept new Spark tasks for up to 60 minutes. We enhanced the recommissioning logic, which ensures recommissioned nodes would start accepting new tasks within seconds, thereby improving cluster utilization. This improvement is available in Amazon EMR release 6.11 and higher.
  • Minimized cluster scaling interruptions due to disk over-utilization – The YARN ResourceManager exclude file is a key component of Apache Hadoop that Amazon EMR uses to centrally manage cluster resources for multiple data-processing frameworks. This exclude file contains a list of nodes to be removed from the cluster to facilitate a cluster scale-down operation. With Amazon EMR release 6.11.0, we improved the cluster scaling workflow to reduce scale-down failures. This improvement minimizes failures due to partial updates or corruption in the exclude file caused by low disk space. Additionally, we built a robust file recovery mechanism to restore the exclude file in case of corruption, ensuring uninterrupted cluster scaling operations.

Minimized interruptions with enhanced resiliency and availability

Amazon EMR offers high availability and fault tolerance for your big data workloads. Let’s look at a few key improvements we launched in this area:

  • Improved fault tolerance to hardware reconfiguration – Amazon EMR offers the flexibility to decouple storage and compute. We observed that customers often increase the size of or add incremental block-level storage to their EC2 instances as their data processing volume and concurrency grow. Starting with Amazon EMR release 6.11.0, we made the EMR cluster’s local storage file system more resilient to unpredictable instance reconfigurations such as instance restarts. By addressing scenarios where an instant restart could result in the block storage device name to change, we eliminated the risk of the cluster becoming inoperable or losing data.
  • Reduce cluster startup time for Kerberos-enabled EMR clusters with long-running bootstrap actions – Multiple customers use Kerberos for authentication and run long-running bootstrap actions on their EMR clusters. In Amazon EMR 6.9.0 and higher releases, we fixed a timing sequence mismatch issue that occurs between Apache BigTop and the Amazon EMR on EC2 cluster startup sequence. This timing sequence mismatch occurs when a system attempts to perform two or more operations at the same time instead of doing them in the proper sequence. This issue caused certain cluster configurations to experience instance startup timeouts. We contributed a fix to the open-source community and made additional improvements to the Amazon EMR startup sequence to prevent this condition, resulting in cluster start time improvements of up to 200% for such clusters.

Improved cluster resiliency with upgraded logging and debugging capabilities

Effective log management is essential to ensure log availability and maintain the health of EMR clusters. This becomes especially critical when you’re running multiple custom client tools and third-party applications on your Amazon EMR on EC2 clusters. Customers depend on EMR logs, in addition to EMR events, to monitor cluster and workload health, troubleshoot urgent issues, simplify security audit, and enhance compliance. Let’s look at a few key enhancements we made in this area:

  • Upgraded on-cluster log management daemon – Amazon EMR now automatically restarts the log management daemon if it’s interrupted. The Amazon EMR on-cluster log management daemon archives logs to Amazon Simple Storage Service (Amazon S3) and deletes them from instance storage. This minimizes cluster failures due to disk over-utilization, while allowing the log files to remain accessible even after the cluster or node stops. This upgrade is available in Amazon EMR release 6.10.0 and higher. For more information, see Configure cluster logging and debugging.
  • Enhanced cluster stability with improved log rotation and monitoring – Many of our customers have long-running clusters that have been operating for years. Some open-source application logs such as Hive and Kerberos logs that are never rotated can continue to grow on these long-running clusters. This could lead to disk over-utilization and eventually result in cluster failures. We enabled log rotation for such log files to minimize disk, memory, and CPU over-utilization scenarios. Furthermore, we expanded our log monitoring to include additional log folders. These changes, available starting with Amazon EMR version 6.10.0, minimize situations where EMR cluster resources are over-utilized, while ensuring log files are archived to Amazon S3 for a wider variety of use cases.

Conclusion

In this post, we highlighted the improvements that we made in Amazon EMR on EC2 with the goal to make your EMR clusters more resilient and stable. We focused on improving cluster utilization with the improved and optimized scaling experience for EMR workloads, minimized interruptions with enhanced resiliency and availability for Amazon EMR on EC2 clusters, and improved cluster resiliency with upgraded logging and debugging capabilities. We will continue to deliver further enhancements with new Amazon EMR releases. We invite you to try new features and capabilities in the latest Amazon EMR releases and get in touch with us through your AWS account team to share your valuable feedback and comments. To learn more and get started with Amazon EMR, check out the tutorial Getting started with Amazon EMR.


About the Authors

Ravi Kumar is a Senior Product Manager for Amazon EMR at Amazon Web Services.

Kevin Wikant is a Software Development Engineer for Amazon EMR at Amazon Web Services.

Orca Security’s journey to a petabyte-scale data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/orca-securitys-journey-to-a-petabyte-scale-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Eliad Gat and Oded Lifshiz from Orca Security.

With data becoming the driving force behind many industries today, having a modern data architecture is pivotal for organizations to be successful. One key component that plays a central role in modern data architectures is the data lake, which allows organizations to store and analyze large amounts of data in a cost-effective manner and run advanced analytics and machine learning (ML) at scale.

Orca Security is an industry-leading Cloud Security Platform that identifies, prioritizes, and remediates security risks and compliance issues across your AWS Cloud estate. Orca connects to your environment in minutes with patented SideScanning technology to provide complete coverage across vulnerabilities, malware, misconfigurations, lateral movement risk, weak and leaked passwords, overly permissive identities, and more.

The Orca Platform is powered by a state-of-the-art anomaly detection system that uses cutting-edge ML algorithms and big data capabilities to detect potential security threats and alert customers in real time, ensuring maximum security for their cloud environment. At the core of Orca’s anomaly detection system is its transactional data lake, which enables the company’s data scientists, analysts, data engineers, and ML specialists to extract valuable insights from vast amounts of data and deliver innovative cloud security solutions to its customers.

In this post, we describe Orca’s journey building a transactional data lake using Amazon Simple Storage Service (Amazon S3), Apache Iceberg, and AWS Analytics. We explore why Orca chose to build a transactional data lake and examine the key considerations that guided the selection of Apache Iceberg as the preferred table format.

In addition, we describe the Orca Platform architecture and the technologies used. Lastly, we discuss the challenges encountered throughout the project, present the solutions used to address them, and share valuable lessons learned.

Why did Orca build a data lake?

Prior to the creation of the data lake, Orca’s data was distributed among various data silos, each owned by a different team with its own data pipelines and technology stack. This setup led to several issues, including scaling difficulties as the data size grew, maintaining data quality, ensuring consistent and reliable data access, high costs associated with storage and processing, and difficulties supporting streaming use cases. Moreover, running advanced analytics and ML on disparate data sources proved challenging. To overcome these issues, Orca decided to build a data lake.

A data lake is a centralized data repository that enables organizations to store and manage large volumes of structured and unstructured data, eliminating data silos and facilitating advanced analytics and ML on the entire data. By decoupling storage and compute, data lakes promote cost-effective storage and processing of big data.

Why did Orca choose Apache Iceberg?

Orca considered several table formats that have evolved in recent years to support its transactional data lake. Amongst the options, Apache Iceberg stood out as the ideal choice because it met all of Orca’s requirements.

First, Orca sought a transactional table format that ensures data consistency and fault tolerance. Apache Iceberg’s transactional and ACID guarantees, which allow concurrent read and write operations while ensuring data consistency and simplified fault handling, fulfill this requirement. Furthermore, Apache Iceberg’s support for time travel and rollback capabilities makes it highly suitable for addressing data quality issues by reverting to a previous state in a consistent manner.

Second, a key requirement was to adopt an open table format that integrates with various processing engines. This was to avoid vendor lock-in and allow teams to choose the processing engine that best suits their needs. Apache Iceberg’s engine-agnostic and open design meets this requirement by supporting all popular processing engines, including Apache Spark, Amazon Athena, Apache Flink, Trino, Presto, and more.

In addition, given the substantial data volumes handled by the system, an efficient table format was required that can support querying petabytes of data very fast. Apache Iceberg’s architecture addresses this need by efficiently filtering and reducing scanned data, resulting in accelerated query times.

An additional requirement was to allow seamless schema changes without impacting end-users. Apache Iceberg’s range of features, including schema evolution, hidden partitions, and partition evolution, addresses this requirement.

Lastly, it was important for Orca to choose a table format that is widely adopted. Apache Iceberg’s growing and active community aligned with the requirement for a popular and community-backed table format.

Solution overview

Orca’s data lake is based on open-source technologies that seamlessly integrate with Apache Iceberg. The system ingests data from various sources such as cloud resources, cloud activity logs, and API access logs, and processes billions of messages, resulting in terabytes of data daily. This data is sent to Apache Kafka, which is hosted on Amazon Managed Streaming for Apache Kafka (Amazon MSK). It is then processed using Apache Spark Structured Streaming running on Amazon EMR and stored in the data lake. Amazon EMR streamlines the process of loading all required Iceberg packages and dependencies, ensuring that the data is stored in Apache Iceberg format and ready for consumption as quickly as possible.

The data lake is built on top of Amazon S3 using Apache Iceberg table format with Apache Parquet as the underlying file format. In addition, the AWS Glue Data Catalog enables data discovery, and AWS Identity and Access Management (IAM) enforces secure access controls for the lake and its operations.

The data lake serves as the foundation for a variety of capabilities that are supported by different engines.

Data pipelines built on Apache Spark and Athena SQL analyze and process the data stored in the data lake. These data pipelines generate valuable insights and curated data that are stored in Apache Iceberg tables for downstream usage. This data is then used by various applications for streaming analytics, business intelligence, and reporting.

Amazon SageMaker is used to build, train, and deploy a range of ML models. Specifically, the system uses Amazon SageMaker Processing jobs to process the data stored in the data lake, employing the AWS SDK for Pandas (previously known as AWS Wrangler) for various data transformation operations, including cleaning, normalization, and feature engineering. This ensures that the data is suitable for training purposes. Additionally, SageMaker training jobs are employed for training the models. After the models are trained, they are deployed and used to identify anomalies and alert customers in real time to potential security threats. The following diagram illustrates the solution architecture.

Orca security Data Lake Architecture

Challenges and lessons learned

Orca faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing EMR streaming ingestion for high throughput
  • Taming the small files problem for fast reads
  • Maximizing performance with Athena version 3
  • Maintaining Apache Iceberg tables
  • Managing data retention
  • Monitoring the data lake infrastructure and operations
  • Mitigating data quality issues

In this section, we describe each of these challenges and the solutions implemented to address them.

Determining optimal table partitioning

Determining optimal partitioning for each table is very important in order to optimize query performance and minimize the impact on teams querying the tables when partitioning changes. Apache Iceberg’s hidden partitions combined with partition transformations proved to be valuable in achieving this goal because it allowed for transparent changes to partitioning without impacting end-users. Additionally, partition evolution enables experimentation with various partitioning strategies to optimize cost and performance without requiring a rewrite of the table’s data every time.

For example, with these features, Orca was able to easily change several of its table partitioning from DAY to HOUR with no impact on user queries. Without this native Iceberg capability, they would have needed to coordinate the new schema with all the teams that query the tables and rewrite the entire data, which would have been a costly, time-consuming, and error-prone process.

Optimizing EMR streaming ingestion for high throughput

As mentioned previously, the system ingests billions of messages daily, resulting in terabytes of data processed and stored each day. Therefore, optimizing the EMR clusters for this type of load while maintaining high throughput and low costs has been an ongoing challenge. Orca addressed this in several ways.

First, Orca chose to use instance fleets with its EMR clusters because they allow optimized resource allocation by combining different instance types and sizes. Instance fleets improve resilience by allowing multiple Availability Zones to be configured. As a result, the cluster will launch in an Availability Zone with all the required instance types, preventing capacity limitations. Additionally, instance fleets can use both Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot instances, resulting in cost savings.

The process of sizing the cluster for high throughput and lower costs involved adjusting the number of core and task nodes, selecting suitable instance types, and fine-tuning CPU and memory configurations. Ultimately, Orca was able to find an optimal configuration consisting of on-demand core nodes and spot task nodes of varying sizes, which provided high throughput but also ensured compliance with SLAs.

Orca also found that using different Kafka Spark Structured Streaming properties, such as minOffsetsPerTrigger, maxOffsetsPerTrigger, and minPartitions, provided higher throughput and better control of the load. Using minPartitions, which enables better parallelism and distribution across a larger number of tasks, was particularly useful for consuming high lags quickly.

Lastly, when dealing with a high data ingestion rate, Amazon S3 may throttle the requests and return 503 errors. To address this scenario, Iceberg offers a table property called write.object-storage.enabled, which incorporates a hash prefix into the stored S3 object path. This approach effectively mitigates throttling problems.

Taming the small files problem for fast reads

A common challenge often encountered when ingesting streaming data into the data lake is the creation of many small files. This can have a negative impact on read performance when querying the data with Athena or Apache Spark. Having a high number of files leads to longer query planning and runtimes due to the need to process and read each file, resulting in overhead for file system operations and network communication. Additionally, this can result in higher costs due to the large number of S3 PUT and GET requests required.

To address this challenge, Apache Spark Structured Streaming provides the trigger mechanism, which can be used to tune the rate at which data is committed to Apache Iceberg tables. The commit rate has a direct impact on the number of files being produced. For instance, a higher commit rate, corresponding to a shorter time interval, results in lots of data files being produced.

In certain cases, launching the Spark cluster on an hourly basis and configuring the trigger to AvailableNow facilitated the processing of larger data batches and reduced the number of small files created. Although this approach led to cost savings, it did involve a trade-off of reduced data freshness. However, this trade-off was deemed acceptable for specific use cases.

In addition, to address preexisting small files within the data lake, Apache Iceberg offers a data files compaction operation that combines these smaller files into larger ones. Running this operation on a schedule is highly recommended to optimize the number and size of the files. Compaction also proves valuable in handling late-arriving data and enables the integration of this data into consolidated files.

Maximizing performance with Athena version 3

Orca was an early adopter of Athena version 3, Amazon’s implementation of the Trino query engine, which provides extensive support for Apache Iceberg. Whenever possible, Orca preferred using Athena over Apache Spark for data processing. This preference was driven by the simplicity and serverless architecture of Athena, which led to reduced costs and easier usage, unlike Spark, which typically required provisioning and managing a dedicated cluster at higher costs.

In addition, Orca used Athena as part of its model training and as the primary engine for ad hoc exploratory queries conducted by data scientists, business analysts, and engineers. However, for maintaining Iceberg tables and updating table properties, Apache Spark remained the more scalable and feature-rich option.

Maintaining Apache Iceberg tables

Ensuring optimal query performance and minimizing storage overhead became a significant challenge as the data lake grew to a petabyte scale. To address this challenge, Apache Iceberg offers several maintenance procedures, such as the following:

  • Data files compaction – This operation, as mentioned earlier, involves combining smaller files into larger ones and reorganizing the data within them. This operation not only reduces the number of files but also enables data sorting based on different columns or clustering similar data using z-ordering. Using Apache Iceberg’s compaction results in significant performance improvements, especially for large tables, making a noticeable difference in query performance between compacted and uncompacted data.
  • Expiring old snapshots – This operation provides a way to remove outdated snapshots and their associated data files, enabling Orca to maintain low storage costs.

Running these maintenance procedures efficiently and cost-effectively using Apache Spark, particularly the compaction operation, which operates on terabytes of data daily, requires careful consideration. This entails appropriately sizing the Spark cluster running on EMR and adjusting various settings such as CPU and memory.

In addition, using Apache Iceberg’s metadata tables proved to be very helpful in identifying issues related to the physical layout of Iceberg’s tables, which can directly impact query performance. Metadata tables offer insights into the physical data storage layout of the tables and offer the convenience of querying them with Athena version 3. By accessing the metadata tables, crucial information about tables’ data files, manifests, history, partitions, snapshots, and more can be obtained, which aids in understanding and optimizing the table’s data layout.

For instance, the following queries can uncover valuable information about the underlying data:

  • The number of files and their average size per partition:
    >SELECT partition, file_count, (total_size / file_count) AS avg_file_size FROM "db"."table$partitions"

  • The number of data files pointed to by each manifest:
    SELECT path, added_data_files_count + existing_data_files_count AS number_of_data_files FROM "db"."table$manifests"

  • Information about the data files:
    SELECT file_path, file_size_in_bytes FROM "db"."table$files"

  • Information related to data completeness:
    SELECT record_count, partition FROM "db"."table$partitions"

Managing data retention

Effective management of data retention in a petabyte-scale data lake is crucial to ensure low storage costs as well as to comply with GDPR. However, implementing such a process can be challenging when dealing with Iceberg data stored in S3 buckets, because deleting files based on simple S3 lifecycle policies could potentially cause table corruption. This is because Iceberg’s data files are referenced in manifest files, so any changes to data files must also be reflected in the manifests.

To address this challenge, certain considerations must be taken into account while handling data retention properly. Apache Iceberg provides two modes for handling deletes, namely copy-on-write (CoW), and merge-on-read (MoR). In CoW mode, Iceberg rewrites data files at the time of deletion and creates new data files, whereas in MoR mode, instead of rewriting the data files, a delete file is written that lists the position of deleted records in files. These files are then reconciled with the remaining data during read time.

In favor of faster read times, CoW mode is preferable and when used in conjunction with the expiring old snapshots operation, it allows for the hard deletion of data files that have exceeded the set retention period.

In addition, by storing the data sorted based on the field that will be utilized for deletion (for example, organizationID), it’s possible to reduce the number of files that require rewriting. This optimization significantly enhances the efficiency of the deletion process, resulting in improved deletion times.

Monitoring the data lake infrastructure and operations

Managing a data lake infrastructure is challenging due to the various components it encompasses, including those responsible for data ingestion, storage, processing, and querying.

Effective monitoring of all these components involves tracking resource utilization, data ingestion rates, query runtimes, and various other performance-related metrics, and is essential for maintaining optimal performance and detecting issues as soon as possible.

Monitoring Amazon EMR was crucial because it played a vital role in the system for data ingestion, processing, and maintenance. Orca monitored the cluster status and resource usage of Amazon EMR by utilizing the available metrics through Amazon CloudWatch. Furthermore, it used JMX Exporter and Prometheus to scrape specific Apache Spark metrics and create custom metrics to further improve the pipelines’ observability.

Another challenge emerged when attempting to further monitor the ingestion progress through Kafka lag. Although Kafka lag tracking is the standard method for monitoring ingestion progress, it posed a challenge because Spark Structured Streaming manages its offsets internally and doesn’t commit them back to Kafka. To overcome this, Orca utilized the progress of the Spark Structured Streaming Query Listener (StreamingQueryListener) to monitor the processed offsets, which were then committed to a dedicated Kafka consumer group for lag monitoring.

In addition, to ensure optimal query performance and identify potential performance issues, it was essential to monitor Athena queries. Orca addressed this by using key metrics from Athena and the AWS SDK for Pandas, specifically TotalExecutionTime and ProcessedBytes. These metrics helped identify any degradation in query performance and keep track of costs, which were based on the size of the data scanned.

Mitigating data quality issues

Apache Iceberg’s capabilities and overall architecture played a key role in mitigating data quality challenges.

One of the ways Apache Iceberg addresses these challenges is through its schema evolution capability, which enables users to modify or add columns to a table’s schema without rewriting the entire data. This feature prevents data quality issues that may arise due to schema changes, because the table’s schema is managed as part of the manifest files, ensuring safe changes.

Furthermore, Apache Iceberg’s time travel feature provides the ability to review a table’s history and roll back to a previous snapshot. This functionality has proven to be extremely useful in identifying potential data quality issues and swiftly resolving them by reverting to a previous state with known data integrity.

These robust capabilities ensure that data within the data lake remains accurate, consistent, and reliable.

Conclusion

Data lakes are an essential part of a modern data architecture, and now it’s easier than ever to create a robust, transactional, cost-effective, and high-performant data lake by using Apache Iceberg, Amazon S3, and AWS Analytics services such as Amazon EMR and Athena.

Since building the data lake, Orca has observed significant improvements. The data lake infrastructure has allowed Orca’s platform to have seamless scalability while reducing the cost of running its data pipelines by over 50% utilizing Amazon EMR. Additionally, query costs were reduced by more than 50% using the efficient querying capabilities of Apache Iceberg and Athena version 3.

Most importantly, the data lake has made a profound impact on Orca’s platform and continues to play a key role in its success, supporting new use cases such as change data capture (CDC) and others, and enabling the development of cutting-edge cloud security solutions.

If Orca’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how this solution can address them.
  • Reach out to experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. This hands-on experience will provide valuable insights into the complexities of a transactional data lake.

Embarking on a journey to a transactional data lake using Amazon S3, Apache Iceberg, and AWS Analytics can vastly improve your organization’s data infrastructure, enabling advanced analytics and machine learning, and unlocking insights that drive innovation.


About the Authors

Eliad Gat is a Big Data & AI/ML Architect at Orca Security. He has over 15 years of experience designing and building large-scale cloud-native distributed systems, specializing in big data, analytics, AI, and machine learning.

Oded Lifshiz is a Principal Software Engineer at Orca Security. He enjoys combining his passion for delivering innovative, data-driven solutions with his expertise in designing and building large-scale machine learning pipelines.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan also leads the Apache Iceberg Israel community.

Carlos Rodrigues is a Big Data Specialist Solutions Architect at Amazon Web Services. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

Sofia Zilberman is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services. She has a track record of 15 years of creating large-scale, distributed processing systems. She remains passionate about big data technologies and architecture trends, and is constantly on the lookout for functional and technological innovations.

Enable remote reads from Azure ADLS with SAS tokens using Spark in Amazon EMR

Post Syndicated from Kiran Anand original https://aws.amazon.com/blogs/big-data/enable-remote-reads-from-azure-adls-with-sas-tokens-using-spark-in-amazon-emr/

Organizations use data from many sources to understand, analyze, and grow their business. These data sources are often spread across various public cloud providers. Enterprises may also expand their footprint by mergers and acquisitions, and during such events they often end up with data spread across different public cloud providers. These scenarios can create the need for AWS services to remotely access, in an ad hoc and temporary fashion, data stored in another public cloud provider such as Microsoft Azure to enable business as usual or facilitate a transition.

In such scenarios, data scientists and analysts are presented with a unique challenge when working to complete a quick data analysis because data typically has to be duplicated or migrated to a centralized location. Doing so introduces time delays, increased cost, and higher complexity as pipelines or replication processes are stood up by data engineering teams. In the end, the data may not even be needed, resulting in further loss of resources and time. Having quick, secure, and constrained access to the maximum amount of data is critical for enterprises to improve decision-making. Amazon EMR, with its open-source Hadoop modules and support for Apache Spark and Jupyter and JupyterLab notebooks, is a good choice to solve this multi-cloud data access problem.

Amazon EMR is a top-tier cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR Notebooks, a managed environment based on Jupyter and JupyterLab notebooks, enables you to interactively analyze and visualize data, collaborate with peers, and build applications using EMR clusters running Apache Spark.

In this post, we demonstrate how to set up quick, constrained, and time-bound authentication and authorization to remote data sources in Azure Data Lake Storage (ADLS) using a shared access signature (SAS) when running Apache Spark jobs via EMR Notebooks attached to an EMR cluster. This access enables data scientists and data analysts to access data directly when operating in multi-cloud environments and join datasets in Amazon Simple Storage Service (Amazon S3) with datasets in ADLS using AWS services.

Overview of solution

Amazon EMR inherently includes Apache Hadoop at its core and integrates other related open-source modules. The hadoop-aws and hadoop-azure modules provide support for AWS and Azure integration, respectively. For ADLS Gen2, the integration is done through the abfs connector, which supports reading and writing data in ADLS. Azure provides various options to authorize and authenticate requests to storage, including SAS. With SAS, you can grant restricted access to ADLS resources over a specified time interval (maximum of 7 days). For more information about SAS, refer to Delegate access by using a shared access signature.

Out of the box, Amazon EMR doesn’t have the required libraries and configurations to connect to ADLS directly. There are different methods to connect Amazon EMR to ADLS, and they all require custom configurations. In this post, we focus specifically on connecting from Apache Spark in Amazon EMR using SAS tokens generated for ADLS. The SAS connectivity is possible in Amazon EMR version 6.9.0 and above, which bundles hadoop-common 3.3.0 where support for HADOOP-16730 has been implemented. However, although the hadoop-azure module provides a SASTokenProvider interface, it is not yet implemented as a class. For accessing ADLS using SAS tokens, this interface should be implemented as a custom class JAR and presented as a configuration within the EMR cluster.

You can find a sample implementation of the SASTokenProvider interface on GitHub. In this post, we use this sample implementation of the SASTokenProvider interface and package it as a JAR file that can be added directly to an EMR environment on version 6.9.0 and above. To enable the JAR, a set of custom configurations are required on Amazon EMR that enable the SAS token access to ADLS. The provided JAR needs to be added to the HADOOP_CLASSPATH, and then the HADOOP_CLASSPATH needs to be added to the SPARK_DIST_CLASSPATH. This is all handled in the sample AWS CloudFormation template provided with this post. At a high level, the CloudFormation template deploys the Amazon EMR cluster with the custom configurations and has a bootstrapping script that stages the JAR on the nodes of the EMR cluster. The CloudFormation template also stages a sample Jupyter notebook and datasets into an S3 bucket. When the EMR cluster is ready, the EMR notebook needs to be attached to it and the sample Jupyter notebook loaded. After the SAS token configurations are done in the notebook, we can start reading data remotely from ADLS by running the cells within the notebook. The following diagram provides a high-level view of the solution architecture.

Architecture Overview

We walk through the following high-level steps to implement the solution:

  1. Create resources using AWS CloudFormation.
  2. Set up sample data on ADLS and create a delegated access with an SAS token.
  3. Store the SAS token securely in AWS Secrets Manager.
  4. Deploy an EMR cluster with the required configurations to securely connect and read data from ADLS via the SAS token.
  5. Create an EMR notebook and attach it to the launched EMR cluster.
  6. Read data via Spark from ADLS within the JupyterLab notebook.

For this setup, data is going over the public internet, which is not a best practice nor an AWS recommendation, but it’s sufficient to showcase the Amazon EMR configurations that enable remote reads from ADLS. Solutions such as AWS Direct Connect or AWS Site-to-Site VPN should be utilized to secure data traffic in enterprise deployments.

For an AWS Command Line Interface (AWS CLI)-based deployment example, refer to the appendix at the end of this post.

Prerequisites

To get this solution working, we have a set of prerequisites for both AWS and Microsoft Azure:

  • An AWS account that can create AWS Identity and Access Management (IAM) resources with custom names and has access enabled for Amazon EMR, Amazon S3, AWS CloudFormation, and Secrets Manager.
  • The old Amazon EMR console enabled.
  • An Azure account with a storage account and container.
  • Access to blob data in ADLS with Azure AD credentials. The user must have the required role assignments in Azure. Refer to Assign an Azure role for more details.

We are following the best practice of using Azure AD credentials to create a user delegation SAS when applications need access to data storage using shared access signature tokens. In this post, we create and use a user delegation SAS with read and list permissions for ADLS access. For more information about creating SAS tokens using the Azure portal, refer to Use the Azure portal.

Before we generate the user delegation SAS token, we need to ensure the credential that will be used to generate the token has appropriate permissions to access data on the ADLS storage account. Requests submitted to the ADLS account using the user delegation SAS token are authorized with the Azure AD credentials that were used to create the SAS token.

The following minimum Azure role-based access control is required at the storage account level to access the data on ADLS storage:

  • Reader – Allow viewing resources such as listing the Azure storage account and its configuration
  • Storage Blob Data Reader – Allow reading and listing Azure storage containers and blobs
  • Storage Blob Delegator – In addition to the permissions to access the data on the ADLS account, you also need this role to generate a user delegation SAS token

Create an EMR cluster and S3 artifacts with AWS CloudFormation

To create the supported version of an EMR cluster with the required SAS configurations and stage all the required artifacts in Amazon S3, complete the following steps:

  1. Sign in to the AWS Management Console in your Region (for this post, we use us-east-1).
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. Choose Next.

Create Stack

  1. For Stack name, enter an appropriate lowercase name.
  2. For EmrRelease, leave as default.

As of this writing, the stack has been tested against 6.9.0 and 6.10.0.

  1. Choose Next.
    Stack Details
  2. On the next page, choose Next.
  3. Review the details and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.
  5. Monitor the progress of the stack creation until it’s complete (about 15–20 minutes).
  6. When the stack creation is complete, navigate to the stack detail page.
  7. On the Resources tab, find the logical ID with the name S3ArtifactsBucket.
  8. Choose the link for the physical ID that starts with emr-spark-on-adls-<GUID> to be redirected to the bucket on the Amazon S3 console.
  9. On the Objects tab, open the EMR/ folder.
    S3 Bucket
  10. Open the artifacts/ folder.

There are five artifacts staged by the CloudFormation stack in this path:

  • azsastknprovider-1.0-SNAPSHOT.jar – The custom implementation of the SASTokenProvider interface.
  • EMR-Direct-Read-From-ADLS.ipynb – The Jupyter notebook that we’ll use with the EMR cluster to read data from ADLS.
  • env-staging.sh – A bash script that the Amazon EMR bootstrap process runs to stage azsastknprovider-1.0-SNAPSHOT.jar across cluster nodes.
  • Medallion_Drivers_-_Active.csv – A sample dataset that needs to be staged in the ADLS container from which we will read.
  • self-signed-certs.zip – The openSSL self-signed certificates used by AWS CloudFormation to encrypt data in transit. This example is a proof of concept demonstration only. Using self-signed certificates is not recommended and presents a potential security risk. For production systems, use a trusted certification authority (CA) to issue certificates.
  1. Select Medallion_Drivers_-_Active.csv and choose Download.
  2. Select EMR-Direct-Read-From-ADLS.ipynb and choose Download.

Create the SAS token and stage sample data

To generate the user delegation SAS token from the Azure portal, log in to the Azure portal with your account credentials and complete the following steps:

  1. Navigate to Storage account, Access Control, and choose Add role assignment.
  2. Add the following roles to your user: Reader, Storage Blob Data Reader, and Storage Blob Delegator.
  3. Navigate to Storage account, Containers, and choose the container you want to use.
  4. Under Settings in the navigation pane, choose Shared access tokens.
  5. Select User delegation key for Signing method.
  6. On the Permissions menu, select Read and List.
    ADLS Container
  7. For Start and expiry date/time, define the start and expiry times for the SAS token.
  8. Choose Generate SAS token and URL.
  9. Copy the token under Blob SAS token and save this value.
    SAS Token
  10. Choose Overview in the navigation pane.
  11. Choose Upload and upload the Medallion_Drivers_-_Active.csv file downloaded earlier.

Store the SAS token in Secrets Manager

Next, we secure the SAS token in Secrets Manager so it can be programmatically pulled from the Jupyter notebook.

  1. Open the Secrets Manager console in the same Region you have been working in (in this case, us-east-1).
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. In the Key/value pairs section, enter a name for the key and enter the blob SAS token for the value.
  5. For Encryption key, choose the default AWS managed key.
  6. Choose Next.
    Secret Type
  7. For Secret name, enter a name for your secret.
  8. Leave the optional fields as default and choose Next.
  9. On the next page, leave the settings as default and choose Next.

Setting up a secret rotation is a best practice but out of scope for this post. You can do so via Azure RM PowerShell, which can be integrated with the Lambda rotation function from Secrets Manager.

  1. Choose Store.
  2. Refresh the Secrets section and choose your secret.
  3. In the Secret details section, copy the value for Secret ARN to use in the Jupyter notebook.
    Secret ARN

Configure an EMR notebook with the SAS token and read ADLS data

Finally, we create the EMR notebook environment, integrate the SAS token into the downloaded Jupyter notebook, and perform a read against ADLS.

  1. Open the Amazon EMR console in the same Region you have been working in (in this case. us-east-1).
  2. Under EMR on EC2 in the navigation pane, choose Clusters.
  3. In the cluster table, choose Cluster with ADLS SAS Access.
    EMR Cluster

On the Summary tab, you will find the applications deployed on the cluster.

EMR Summary Tab

On the Configurations tab, you can see the configurations deployed by the CloudFormation stack loading the customer JAR in the appropriate classpaths.

EMR Configurations Tab

  1. Under EMR on EC2 in the navigation pane, choose Notebooks.
  2. Choose Create notebook.
  3. Enter an appropriate name for the notebook for Notebook name.
  4. For Cluster, select Choose an existing cluster, then choose the cluster you created earlier.
  5. Leave all other settings as default and choose Create notebook.
    Create Notebook
  6. When the notebook environment is set up, choose Open in JupyterLab.
  7. On your local machine, navigate to where you saved the EMR-Direct-Read-From-ADLS.ipynb notebook.
  8. Drag and drop it into the left pane of the JupyterLab environment.
  9. Choose EMR-Direct-Read-From-ADLS.ipynb from the left pane and ensure that the interpreter selected for the notebook in the top-right corner is PySpark.
    Open Notebook
  10. In the notebook, under the SAS TOKEN SETUP markup cell, replace <AWS_REGION> with the Region you are using (in this case, us-east-1).
  11. In the same code cell, replace <ADLS_SECRET_MANAGER_SECRET_ARN> with your secret ARN and <SECRET_KEY> with your secret key.

You can get the secret key from Secrets Manager in the Secret value section for the secret you created earlier.

Secret Key

  1. In the code cell below the HADOOP CONFIGURATIONS markup cell, replace <YOUR_STORAGE_ACCOUNT> with your Azure storage account where the SAS token was set up earlier.
  2. In the code cell below the READ TEST DATA markup cell, replace <YOUR_CONTAINER> and <YOUR_STORAGE_ACCOUNT> with your Azure container and storage account name, respectively.
  3. On the Run menu, choose Run All Cells.
    Run All Cells

After all notebook cells run, you should see 10 rows in a tabular format containing the data coming from ADLS, which now can be used directly in the notebook or can be written to Amazon S3 for further use.

Results

Clean up

Deploying a CloudFormation template incurs cost based on the resources deployed. The EMR cluster is configured to stop after an hour of inactivity, but to avoid incurring ongoing charges and to fully clean up the environment, complete the following steps:

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook you created and choose Delete, and wait for the delete to complete before proceeding to the next step.
  3. On the Amazon EMR console, choose Clusters in the navigation pane.
  4. Select the cluster Cluster With ADLS SAS Access and choose Terminate.
  5. On the Amazon VPC console, choose Security groups in the navigation pane.
  6. Find the ElasticMapReduce-Master-Private, ElasticMapReduce-Slave-Private, ElasticMapReduce-ServiceAccess, ElasticMapReduceEditors-Livy, and ElasticMapReduceEditors-Editor security groups attached to the VPC created by the CloudFormation stack and delete their inbound and outbound rules.
  7. Select these five security groups and on the Actions menu, choose Delete security groups.
  8. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  9. Select the stack and choose Delete.
  10. On the Secrets Manager console, choose Secrets in the navigation pane.
  11. Select the stored SAS secret and on the Actions menu, choose Delete secret.
  12. On the IAM console, choose Roles in the navigation pane.
  13. Select the role EMR_Notebooks_DefaultRole and choose Delete.

Conclusion

In this post, we used AWS CloudFormation to deploy an EMR cluster with the appropriate configurations to connect to Azure Data Lake Storage using SAS tokens over the public internet. We provided an implementation of the SASTokenProvider interface to enable the SAS token-based connectivity to ADLS. We also provided relevant information on the SAS token creation steps on the Azure side. Furthermore, we showed how data scientists and analysts can use EMR notebooks connected to an EMR cluster to read data directly from ADLS with a minimum set of configurations. Finally, we used Secrets Manager to secure the storage of the SAS token and integrated it within the EMR notebook.

We encourage you to review the CloudFormation stack YAML template and test the setup on your own. If you implement the example and run into any issues or just have feedback, please leave a comment.


Appendix

AWS CLI-based deployment model

If you prefer to use command line options, this section provides AWS CLI commands to deploy this solution. Note that this is an alternative deployment model different from the CloudFormation template provided in the previous sections. Sample scripts and commands provided here include placeholders for values that need to be updated to suit your environment. The AWS CLI commands provided in this section should be used as guidance to understand the deployment model. Update the commands as needed to follow all the security procedures required by your organization.

Prerequisites for an AWS CLI-based deployment

The following are the assumptions made while using this AWS CLI-based deployment:

  • You will be deploying this solution in an existing AWS environment that has all the necessary security configurations enabled
  • You already have an Azure environment where you have staged the data that needs to be accessed through AWS services

You must also complete additional requirements on the AWS and Azure sides.

For AWS, complete the following prerequisites:

  1. Install the AWS CLI on your local computer or server. For instructions, see Installing, updating, and uninstalling the AWS CLI.
  2. Create an Amazon Elastic Compute Cloud (Amazon EC2) key pair for SSH access to your Amazon EMR nodes. For instructions, see Create a key pair using Amazon EC2.
  3. Create an S3 bucket to store the EMR configuration files, bootstrap shell script, and custom JAR file. Make sure that you create a bucket in the same Region as where you plan to launch your EMR cluster.
  4. Copy and save the SAS token from ADLS to use in Amazon EMR.

For Azure, complete the following prerequisites:

  1. Generate the user delegation SAS token on the ADLS container where your files are present, with the required levels of access granted. In this post, we are use SAS tokens with only read and list access.
  2. Copy and save the generated SAS token to use with Amazon EMR.

Update configurations

We have created a custom class that implements the SASTokenProvider interface and created a JAR file called azsastknprovider-1.0-SNAPSHOT.jar, which is provided as a public artifact for this post. A set of configurations are required on the Amazon EMR side to use the SAS tokens to access ADLS. A sample configuration file in JSON format called EMR-HadoopSpark-ADLS-SASConfig.json is also provided as a public artifact for this post. Download the JAR and sample config files.

While copying the code or commands from this post, make sure to remove any control characters or extra newlines that may get added.

  1. Create a shell script called env-staging-hadoopspark.sh to copy the custom JAR file azsastknprovider-1.0-SNAPSHOT.jar (provided in this post) to the EMR cluster nodes’ local storage during the bootstrap phase. The following code is a sample bootstrap shell script:
    #!/bin/bash
    # Stage the SASTokenProvider interface implementation jar on the local filesystem
    sudo mkdir /lib/customjars
    sudo aws s3 cp s3://<s3 bucket>/<path>/azsastknprovider-1.0-SNAPSHOT.jar /lib/customjars
    sudo chmod 755 /lib/customjars

  2. Update the bootstrap shell script to include your S3 bucket and the proper path where the custom JAR file is uploaded in your AWS environment.
  3. Upload the JAR file, config file, and the bootstrap shell script to your S3 bucket.
  4. Keep a copy of the updated configuration file EMR-HadoopSpark-ADLS-SASConfig.json locally in the same directory from where you plan to run the AWS CLI commands.

Launch the EMR cluster using the AWS CLI

We use the create-cluster command in the AWS CLI to deploy an EMR cluster. We need a bootstrap action at cluster creation to copy the custom JAR file to the EMR cluster nodes’ local storage. We also need to add a few custom configurations on Amazon EMR to connect to ADLS. For this, we need to supply a configuration file in JSON format. The following code launches and configures an EMR cluster that can connect to your Azure account and read objects in ADLS through Hadoop and Spark:

aws emr create-cluster \
--name "Spark Cluster with ADLS Access" \
--release-label emr-6.10.0 \
--applications Name=Hadoop Name=Spark \
--ec2-attributes KeyName=<key pair name> \
--instance-type m5.xlarge \
--instance-count 3 \
--use-default-roles \
--enable-debugging \
--log-uri s3://<s3 bucket>/<logs path>/ \
--configurations file://EMR-HadoopSpark-ADLS-SASConfig.json \
--bootstrap-actions Path="s3://<s3 bucket>/<path>/env-staging-hadoopspark.sh"

Additional configurations for Spark jobs

The following additional properties should be set inside your Spark application code to access data in ADLS through Amazon EMR. These should be set on the Spark session object used within your Spark application code.

spark.conf.set("fs.azure.account.auth.type.<azure storage account>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<azure storage account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.net", "<Your SAS token from ADLS - remove the first character if it is &>")

These additional configurations can be set in the core-site.xml file for the EMR cluster. However, setting these in the application code is more secure and recommended because it won’t expose the SAS token in the Amazon EMR configurations.

Submit the Spark application in Amazon EMR using the AWS CLI

You can run a Spark application on Amazon EMR in different ways:

  • Log in to an EMR cluster node through SSH using the EC2 key pair you created earlier and then run the application using spark-submit
  • Submit a step via the console while creating the cluster or after the cluster is running
  • Use the AWS CLI to submit a step to a running cluster:
aws emr add-steps \
--cluster-id <cluster-id> \
--steps 'Type=Spark,Name=Read-ADLS-Data,ActionOnFailure=CONTINUE,Args=[s3://<s3 bucket>/<path>/<spark-application>,—deploy-mode,cluster]'

To read files in ADLS within a Spark application that is running on an EMR cluster, you need to use the abfs driver and refer to the file in the following format, just as you would have done in your Azure environment:

abfs://<azure container name>@<azure storage account>.dfs.core.windows.net/<path>/<file-name>

The following sample PySpark script reads a CSV file in ADLS using SAS tokens and writes it to Amazon S3, and can be run from the EMR cluster you created:

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder.appName('Read data from ADLS').getOrCreate()
spark.conf.set("fs.azure.account.auth.type.<azure storage account>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<azure storage account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.net", "<Your SAS token from ADLS>")
adlsDF = spark.read.csv("abfs://<azure container name>@<azure storage account>.dfs.core.windows.net/<path>/<file.csv>")
adlsDF.write.csv("s3://<s3 bucket>/<path>/")

Clean up using the AWS CLI

Delete the EMR cluster created using the delete-cluster command.


About the authors

Kiran Anand is a Principal Solutions Architect with the AWS Data Lab. He is a seasoned professional with more than 20 years of experience in information technology. His areas of expertise are databases and big data solutions for data engineering and analytics. He enjoys music, watching movies, and traveling with his family.

Andre Hass is a Sr. Solutions Architect with the AWS Data Lab. He has more than 20 years of experience in the databases and data analytics field. Andre enjoys camping, hiking, and exploring new places with his family on the weekends, or whenever he gets a chance. He also loves technology and electronic gadgets.

Stefan Marinov is a Sr. Solutions Architecture Manager with the AWS Data Lab. He is passionate about big data solutions and distributed computing. Outside of work, he loves spending active time outdoors with his family.

Hari Thatavarthy is a Senior Solutions Architect on the AWS Data Lab team. He helps customers design and build solutions in the data and analytics space. He believes in data democratization and loves to solve complex data processing-related problems.

Hao Wang is a Senior Big Data Architect at AWS. Hao actively works with customers building large scale data platforms on AWS. He has a background as a software architect on implementing distributed software systems. In his spare time, he enjoys reading and outdoor activities with his family.

Cost monitoring for Amazon EMR on Amazon EKS

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/cost-monitoring-for-amazon-emr-on-amazon-eks/

Amazon EMR is the industry-leading cloud big data solution, providing a collection of open-source frameworks such as Spark, Hive, Hudi, and Presto, fully managed and with per-second billing. Amazon EMR on Amazon EKS is a deployment option allowing you to deploy Amazon EMR on the same Amazon Elastic Kubernetes Service (Amazon EKS) clusters that is multi-tenant and used by other applications, improving resource utilization, reducing cost, and simplifying infrastructure management. EMR on EKS provide you up to 5.37 times better performance than OSS Spark v3.3.1 with 76.8% cost savings. It also provides a wide variety of job submission methods, like an AWS API called StartJobRun, or through a declarative way with a Kubernetes controller through the AWS Controllers for Kubernetes for Amazon EMR on EKS.

This consolidation comes with a trade-off of increased difficulty measuring fine-grained costs for showback or chargeback by team or application. According to a CNCF and FinOps Foundation survey, 68% of Kubernetes users either rely on monthly estimates or don’t monitor Kubernetes costs at all. And for respondents reporting active Kubernetes cost monitoring, AWS Cost Explorer and Kubecost were ranked as the most popular tools being used.

Currently, you can distribute costs per tenant using a hard multi-tenancy with separate EKS clusters in dedicated AWS accounts or a soft multi-tenancy using separate node groups in a shared EKS cluster. To reduce costs and improve resource utilization, you can use namespace-based segregation, where nodes are shared across different namespaces. However, calculating and attributing costs to teams by workload or namespaces while taking into account compute optimization (like Saving Plans or Spot Instance cost) and the cost of AWS services like EMR on EKS is a challenging and non-trivial task.

In this post, we present a cost chargeback solution for EMR on EKS that combines the AWS-native capabilities of AWS Cost and Usage Reports (AWS CUR) alongside the in-depth Kubernetes cost visibility and insights using Kubecost on Amazon EKS.

Solution overview

A job in EMR on EKS incur costs mainly on two dimensions: compute resources and a marginal uplift charge for EMR on EKS usage. To track the cost associated with each of the dimensions, we use data from three sources:

  • AWS CUR – We use this to get the EMR on EKS cost uplift per job and for Kubecost to reconcile the compute cost with any saving plans or reserved instance used. The supporting infrastructure for CUR is deployed as defined in Setting up Athena using AWS CloudFormation templates.
  • Kubecost – We use this to get the compute cost incurred by the executor and driver pods.

The cost allocation process includes the following components:

  • The compute cost is provided by Kubecost. However, in order to do an in-depth analysis, we define an hourly Kubernetes CronJob on it that starts a pod to retrieve data from Kubecost and stores it in Amazon Simple Storage Service (Amazon S3).
  • CUR files are stored in an S3 bucket.
  • We use Amazon Athena to create a view and provide a consolidated view of the total cost to run an EMR on EKS job.
  • Finally, you can connect your preferred business intelligence tools using the JDBC or ODBC connections to Athena. In this post, we use Amazon QuickSight native integration for visualization purposes.

The following diagram shows the overall architecture as well as how the different components interact with each other.

emr-eks-cost-tracking-architecture

We provide a shell script to deploy our the tracking solution. The shell script configures the infrastructure using an AWS CloudFormation template, the AWS Command Line Interface (AWS CLI), and eksctl and kubectl commands. This script runs the following actions:

  1. Start the CloudFormation deployment.
  2. Create and configure an AWS Cost and Usage Report.
  3. Configure and deploy Kubecost backed by Amazon Managed Service for Prometheus.
  4. Deploy a Kubernetes CronJob.

Prerequisites

You need the following prerequisites:

This post assumes you already have an EKS cluster and run EMR on EKS jobs. If you don’t have an EKS cluster ready to test the solution, we suggest starting with a standard EMR on EKS blueprint that configures a cluster to submit EMR on EKS jobs.

Set up the solution

To run the shell script, complete the following steps:

  1. Clone the following GitHub repository.
  2. Go to the folder cost-tracking with the following command:

cd cost-tracking

  1. Run the script with following command :

sh deploy-emr-eks-cost-tracking.sh REGION KUBECOST-VERSION EKS-CLUSTER-NAME ACCOUNT-ID

After you run the script, you’re ready to use Kubecost and the CUR data to understand the cost associated with your EMR on EKS jobs.

Tracking cost

In this section, we show you how to analyze the compute cost that is retrieved from Kubecost, how to query EMR on EKS uplift data, and how to combine them to have a single consolidated view for the cost.

Compute cost

Kubecost offers various ways to track cost per Kubernetes object. For example, you can track cost by pod, controller, job, label, or deployment. It also allows you to understand the cost of idle resources, like Amazon Elastic Compute Cloud (Amazon EC2) instances that aren’t fully utilized by pods. In this post, we assume that no nodes are provisioned if no EMR on EKS job is running, and we use the Karpenter Cluster Autoscaler to provision nodes when jobs are submitted. Karpenter also does bin packing, which optimizes the EC2 resource utilization and in turn reduces the cost of idle resources.

To track compute cost associated with EMR on EKS pods, we query the Kubecost allocation API by passing pod and labels in the aggregate parameter. We use the emr-containers.amazonaws.com/job.id and emr-containers.amazonaws.com/virtual-cluster-id labels that are always present in executor and driver pods. The labels are used to filter Kubecost data to get only the cost associated with EMR on EKS pods. You can review various levels of granularity at the pod, job, and virtual cluster level to understand the cost of a driver vs. executor, or of using Spot Instances in jobs. You can also use the virtual cluster cost to understand the overall cost of a EMR on EMR when it’s used in a namespace that is used by applications other than EMR on EKS.

We also provide the instance_id, instance size, and capacity type (On-Demand or Spot) that was used to run the pod. This is retrieved through querying the Kubecost assets API. This data can be useful to understand how you run your jobs and which capacity you use more often.

The data about the cost of running the pods as well as the assets is retrieved with a Kubernetes CronJob that submits the request to the Kubecost API, joins the two data sources (allocation and assets data) on the instance_id, cleans the data, and stores it in Amazon S3 in CSV format.

The compute cost data has multiple fields that are of interest, including cpucost, ramcost (cost of memory), pvcost (cost of Amazon EBS storage), efficiency of use of CPU and RAM, as well as total cost, which represents the aggregate cost of all the resources used, either at pod, job, or virtual cluster level.

To view this data, complete the following steps:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cost_data for the table.
  3. Run the following query:
SELECT job_id,
vc_id,
sum(totalcost) as cost
FROM "athenacurcfn_c_u_r"."compute_cost"
GROUP BY job_id, vc_id

The following screenshot shows the query results.

To query the data about information at the pod level, you can run the following SQL statement:

SELECT
split_part(name, '/', 1) as pod_name,
job_id,
vc_id,
totalcost,
instance_id,
"properties.labels.node_kubernetes_io_instance_type",
capacity_type
FROM "athenacurcfn_c_u_r"."compute_cost";

EMR on EKS uplift

The cost associated with EMR on EKS uplift is available through AWS CUT and is stored in an S3 bucket. The script you ran in the setup step created an Athena table associated to the data in the S3 bucket. The following steps take you through how you can query the data:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cur_data for the table.
  3. Run the following query:
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost
FROM athenacurcfn_c_u_r.automated
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id

This query provides you with the cost per job. The following screenshot shows the results.

You will have to wait up to 24 hours for the CUR data to be available. As such, you should only run the preceding query after the CUR data is available and you have run the EMR on EKS jobs.

Overall cost

To view the overall cost and perform analysis on it, create a view in Athena as follows:

CREATE VIEW emr_eks_cost AS
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost,
'emr-uplift' as category
FROM athenacurcfn_c_u_r.cur_data
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id
UNION
SELECT
job_id,
vc_id,
sum(totalCost) as cost,
'compute' as category
FROM "athenacurcfn_c_u_r"."compute_cost"
group by job_id, vc_id

Now that the view is created, you can query and analyze the cost of running your EMR on EKS jobs:

SELECT sum(cost) as total_cost, job_id, vc_id
FROM "athenacurcfn_c_u_r"."emr_eks_cost"
GROUP BY job_id, vc_id;

The following screenshot shows an example output of the query on the created view.

Lastly, you can use QuickSight for a graphical high-level view on your EMR on EKS spend. The following screenshot shows an example dashboard.

emr-eks-compute-cost-quicksight-dashboard

You can now adapt this solution to your specific needs and build your custom analysis.

Clean up

Throughout this post, you deployed and configured the required infrastructure components to track cost for your EMR on EKS workloads. To avoid incurring additional charges for this solution, delete all the resources you created:

  1. Empty the S3 buckets cost-data-REGION-ACCOUNT_ID and aws-athena-query-results-cur-REGION-ACCOUNT_ID.
  2. Delete the Athena workgroup kubecost-cur-workgroup.
  3. Empty and delete the ECR repository emreks-compute-cost-exporter.
  4. Run the script destroy-emr-eks-cost-tracking.sh, which will delete the AWS CloudFormation deployment, uninstall Kubecost, delete the CronJob, and delete the Cost and Usage Reports.

Conclusion

In this post, we showed how you can use Kubecost capabilities alongside Cost and Usage Reports to closely monitor the costs for Amazon EMR on EKS per virtual cluster or per job. This solution allows you to achieve more granular costs for chargebacks using Athena, Amazon Managed Service for Prometheus, and QuickSight.

The solution presented steps to set up Cost and Usage Reports and Kubecost, and configure a CronJob on an hourly basis to get the cost of running pods spun by EMR on EKS. You can modify the presented solution to run at longer intervals or to collect data on different EKS clusters. You can also modify the Python script run by the CronJob to further clean data or reduce the amount of data stored by eliminating fields you don’t need. You can use the insights provided to drive cost optimization efforts over time, detect any increase of costs, and measure the impact of new deployments or particular events on resource usage and cost performance. For more information about integrating EMR on EKS in your existing Amazon EKS deployment, refer to Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Hamza Mimi Principal Solutions Architect in the French Public sector team at Amazon Web Services (AWS). With a long experience in the telecommunications industry. He is currently working as a customer advisor on topics ranging from digital transformation to architectural guidance.

Introducing Amazon EMR on EKS job submission with Spark Operator and spark-submit

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-job-submission-with-spark-operator-and-spark-submit/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. The EMR runtime provides up to 5.37 times better performance and 76.8% cost savings, when compared to using open-source Apache Spark on Amazon EKS.

Building on the success of Amazon EMR on EKS, customers have been running and managing jobs using the emr-containers API, creating EMR virtual clusters, and submitting jobs to the EKS cluster, either through the AWS Command Line Interface (AWS CLI) or Apache Airflow scheduler. However, other customers running Spark applications have chosen Spark Operator or native spark-submit to define and run Apache Spark jobs on Amazon EKS, but without taking advantage of the performance gains from running Spark on the optimized EMR runtime. In response to this need, starting from EMR 6.10, we have introduced a new feature that lets you use the optimized EMR runtime while submitting and managing Spark jobs through either Spark Operator or spark-submit. This means that anyone running Spark workloads on EKS can take advantage of EMR’s optimized runtime.

In this post, we walk through the process of setting up and running Spark jobs using both Spark Operator and spark-submit, integrated with the EMR runtime feature. We provide step-by-step instructions to assist you in setting up the infrastructure and submitting a job with both methods. Additionally, you can use the Data on EKS blueprint to deploy the entire infrastructure using Terraform templates.

Infrastructure overview

In this post, we walk through the process of deploying a comprehensive solution using eksctl, Helm, and AWS CLI. Our deployment includes the following resources:

  • A VPC, EKS cluster, and managed node group, set up with the eksctl tool
  • Essential Amazon EKS managed add-ons, such as the VPC CNI, CoreDNS, and KubeProxy set up with the eksctl tool
  • Cluster Autoscaler and Spark Operator add-ons, set up using Helm
  • A Spark job execution AWS Identity and Access Management (IAM) role, IAM policy for Amazon Simple Storage Service (Amazon S3) bucket access, service account, and role-based access control, set up using the AWS CLI and eksctl

Prerequisites

Verify that the following prerequisites are installed on your machine:

Set up AWS credentials

Before proceeding to the next step and running the eksctl command, you need to set up your local AWS credentials profile. For instructions, refer to Configuration and credential file settings.

Deploy the VPC, EKS cluster, and managed add-ons

The following configuration uses us-west-1 as the default Region. To run in a different Region, update the region and availabilityZones fields accordingly. Also, verify that the same Region is used in the subsequent steps throughout the post.

Enter the following code snippet into the terminal where your AWS credentials are set up. Make sure to update the publicAccessCIDRs field with your IP before you run the command below. This will create a file named eks-cluster.yaml:

cat <<EOF >eks-cluster.yaml
---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: emr-spark-operator
  region: us-west-1 # replace with your region
  version: "1.25"
vpc:
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
  publicAccessCIDRs: ["YOUR-IP/32"]
availabilityZones: ["us-west-1a","us-west-1b"] # replace with your region
iam:
  withOIDC: true
  serviceAccounts:
  - metadata:
      name: cluster-autoscaler
      namespace: kube-system
    wellKnownPolicies:
      autoScaler: true
    roleName: eksctl-cluster-autoscaler-role
managedNodeGroups:
  - name: m5x
    instanceType: m5.xlarge
    availabilityZones: ["us-west-1a"]
    volumeSize: 100
    volumeType: gp3
    minSize: 2
    desiredCapacity: 2
    maxSize: 10
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned" 
addons:
  - name: vpc-cni
    version: latest
  - name: coredns
    version: latest
  - name: kube-proxy
    version: latest
cloudWatch:
  clusterLogging:
    enableTypes: ["*"]
EOF

Use the following command to create the EKS cluster : eksctl create cluster -f eks-cluster.yaml

Deploy Cluster Autoscaler

Cluster Autoscaler is crucial for automatically adjusting the size of your Kubernetes cluster based on the current resource demands, optimizing resource utilization and cost. Create an autoscaler-helm-values.yaml file and install the Cluster Autoscaler using Helm:

cat <<EOF >autoscaler-helm-values.yaml
---
autoDiscovery:
    clusterName: emr-spark-operator
    tags:
      - k8s.io/cluster-autoscaler/enabled
      - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }}
awsRegion: us-west-1 # Make sure the region same as the EKS Cluster
rbac:
  serviceAccount:
    create: false
    name: cluster-autoscaler
EOF
helm repo add autoscaler https://kubernetes.github.io/autoscaler
helm install nodescaler autoscaler/cluster-autoscaler \
--namespace kube-system \
--values autoscaler-helm-values.yaml --debug

You can also set up Karpenter as a cluster autoscaler to automatically launch the right compute resources to handle your EKS cluster’s applications. You can follow this blog on how to setup and configure Karpenter.

Deploy Spark Operator

Spark Operator is an open-source Kubernetes operator specifically designed to manage and monitor Spark applications running on Kubernetes. It streamlines the process of deploying and managing Spark jobs, by providing a Kubernetes custom resource to define, configure and run Spark applications, as well as manage the job life cycle through Kubernetes API. Some customers prefer using Spark Operator to manage Spark jobs because it enables them to manage Spark applications just like other Kubernetes resources.

Currently, customers are building their open-source Spark images and using S3a committers as part of job submissions with Spark Operator or spark-submit. However, with the new job submission option, you can now benefit from the EMR runtime in conjunction with EMRFS. Starting with Amazon EMR 6.10 and for each upcoming version of the EMR runtime, we will release the Spark Operator and its Helm chart to use the EMR runtime.

In this section, we show you how to deploy a Spark Operator Helm chart from an Amazon Elastic Container Registry (Amazon ECR) repository and submit jobs using EMR runtime images, benefiting from the performance enhancements provided by the EMR runtime.

Install Spark Operator with Helm from Amazon ECR

The Spark Operator Helm chart is stored in an ECR repository. To install the Spark Operator, you first need to authenticate your Helm client with the ECR repository. The charts are stored under the following path: ECR_URI/spark-operator.

Authenticate your Helm client and install the Spark Operator:

aws ecr get-login-password \
--region us-west-1 | helm registry login \
--username AWS \
--password-stdin 608033475327.dkr.ecr.us-west-1.amazonaws.com

You can authenticate to other EMR on EKS supported Regions by obtaining the AWS account ID for the corresponding Region. For more information, refer to how to select a base image URI.

Install Spark Operator

You can now install Spark Operator using the following command:

helm install spark-operator-demo \
oci://608033475327.dkr.ecr.us-west-1.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=us-west-1 \
--version 1.1.26-amzn-0 \
--set serviceAccounts.spark.create=false \
--namespace spark-operator \
--create-namespace

To verify that the operator has been installed correctly, run the following command:

helm list --namespace spark-operator -o yaml

Set up the Spark job execution role and service account

In this step, we create a Spark job execution IAM role and a service account, which will be used in Spark Operator and spark-submit job submission examples.

First, we create an IAM policy that will be used by the IAM Roles for Service Accounts (IRSA). This policy enables the driver and executor pods to access the AWS services specified in the policy. Complete the following steps:

  1. As a prerequisite, either create an S3 bucket (aws s3api create-bucket --bucket <ENTER-S3-BUCKET> --create-bucket-configuration LocationConstraint=us-west-1 --region us-west-1) or use an existing S3 bucket. Replace <ENTER-S3-BUCKET> in the following code with the bucket name.
  2. Create a policy file that allows read and write access to an S3 bucket:
    cat >job-execution-policy.json <<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*",
                    "arn:aws:s3:::aws-data-lake-workshop/*",
                    "arn:aws:s3:::nyc-tlc",
                    "arn:aws:s3:::nyc-tlc/*"
                ]
            }
        ]
    }
    EOL

  3. Create the IAM policy with the following command:
    aws iam create-policy --policy-name emr-job-execution-policy --policy-document file://job-execution-policy.json

  4. Next, create the service account named emr-job-execution-sa-role as well as the IAM roles. The following eksctl command creates a service account scoped to the namespace and service account defined to be used by the executor and driver. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    eksctl create iamserviceaccount \
    --cluster=emr-spark-operator \
    --region us-west-1 \
    --name=emr-job-execution-sa \
    --attach-policy-arn=arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:policy/emr-job-execution-policy \
    --role-name=emr-job-execution-irsa \
    --namespace=data-team-a \
    --approve

  5. Create an S3 bucket policy to allow only the execution role create in step 4 to write and read from the S3 bucket create in step 1. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    cat > bucketpolicy.json<<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ], "Principal": {
                    "AWS": "arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:role/emr-job-execution-irsa"
                },
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*"
                ]
            }
        ]
    }
    EOL
    
    aws s3api put-bucket-policy --bucket ENTER-S3-BUCKET-NAME --policy file://bucketpolicy.json

  6. Create a Kubernetes role and role binding required for the service account used in the Spark job run:
    cat <<EOF >emr-job-execution-rbac.yaml
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: emr-job-execution-sa-role
      namespace: data-team-a
    rules:
      - apiGroups: ["", "batch","extensions"]
        resources: ["configmaps","serviceaccounts","events","pods","pods/exec","pods/log","pods/portforward","secrets","services","persistentvolumeclaims"]
        verbs: ["create","delete","get","list","patch","update","watch"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: emr-job-execution-sa-rb
      namespace: data-team-a
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: emr-job-execution-sa-role
    subjects:
      - kind: ServiceAccount
        name: emr-job-execution-sa
        namespace: data-team-a
    EOF

  7. Apply the Kubernetes role and role binding definition with the following command:
kubectl apply -f emr-job-execution-rbac.yaml

So far, we have completed the infrastructure setup, including the Spark job execution roles. In the following steps, we run sample Spark jobs using both Spark Operator and spark-submit with the EMR runtime.

Configure the Spark Operator job with the EMR runtime

In this section, we present a sample Spark job that reads data from public datasets stored in S3 buckets, processes them, and writes the results to your own S3 bucket. Make sure that you update the S3 bucket in the following configuration by replacing <ENTER_S3_BUCKET> with the URI to your own S3 bucket refered in step 2 of the “Set up the Spark job execution role and service account section. Also, note that we are using data-team-a as a namespace and emr-job-execution-sa as a service account, which we created in the previous step. These are necessary to run the Spark job pods in the dedicated namespace, and the IAM role linked to the service account is used to access the S3 bucket for reading and writing data.

Most importantly, notice the image field with the EMR optimized runtime Docker image, which is currently set to emr-6.10.0. You can change this to a newer version when it’s released by the Amazon EMR team. Also, when configuring your jobs, make sure that you include the sparkConf and hadoopConf settings as defined in the following manifest. These configurations enable you to benefit from EMR runtime performance, AWS Glue Data Catalog integration, and the EMRFS optimized connector.

  1. Create the file (emr-spark-operator-example.yaml) locally and update the S3 bucket location so that you can submit the job as part of the next step:
    cat <<EOF >emr-spark-operator-example.yaml
    ---
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: taxi-example
      namespace: data-team-a
    spec:
      type: Scala
      mode: cluster
      # EMR optimized runtime image
      image: "483788554619.dkr.ecr.eu-west-1.amazonaws.com/spark/emr-6.10.0:latest"
      imagePullPolicy: Always
      mainClass: ValueZones
      mainApplicationFile: s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar
      arguments:
        - s3://nyc-tlc/csv_backup
        - "2017"
        - s3://nyc-tlc/misc/taxi _zone_lookup.csv
        - s3://<ENTER_S3_BUCKET>/emr-eks-results
        - emr_eks_demo
      hadoopConf:
        # EMRFS filesystem config
        fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
        fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem
        fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate
        fs.s3.buffer.dir: /mnt/s3
        fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000"
        mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2"
        mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true"
      sparkConf:
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: "s3://<ENTER_S3_BUCKET>/"
        spark.kubernetes.driver.pod.name: driver-nyc-taxi-etl
        # Required for EMR Runtime and Glue Catalogue
        spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        spark.executor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        # EMRFS commiter
        spark.sql.parquet.output.committer.class: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
        spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
        spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions
        spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
        spark.driver.defaultJavaOptions:  -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
      sparkVersion: "3.3.1"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        memory: "4g"
        serviceAccount: emr-job-execution-sa
      executor:
        cores: 1
        instances: 4
        memory: "4g"
        serviceAccount: emr-job-execution-sa
    EOF

  2. Run the following command to submit the job to the EKS cluster:
    kubectl apply -f emr-spark-operator-example.yaml

    The job may take 4–5 minutes to complete, and you can verify the successful message in the driver pod logs.

  3. Verify the job by running the following command:
kubectl get pods -n data-team-a

Enable access to the Spark UI

The Spark UI is an important tool for data engineers because it allows you to track the progress of tasks, view detailed job and stage information, and analyze resource utilization to identify bottlenecks and optimize your code. For Spark jobs running on Kubernetes, the Spark UI is hosted on the driver pod and its access is restricted to the internal network of Kubernetes. To access it, we need to forward the traffic to the pod with kubectl. The following steps take you through how to set it up.

Run the following command to forward traffic to the driver pod:

kubectl port-forward <driver-pod-name> 4040:4040

You should see text similar to the following:

Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 → 4040

If you didn’t specify the driver pod name at the submission of the SparkApplication, you can get it with the following command:

kubectl get pods -l spark-role=driver,spark-app-name=<your-spark-app-name> -o jsonpath='{.items[0].metadata.name}'

Open a browser and enter http://localhost:4040 in the address bar. You should be able to connect to the Spark UI.

spark-ui-screenshot

Spark History Server

If you want to explore your job after its run, you can view it through the Spark History Server. The preceding SparkApplication definition has the event log enabled and stores the events in an S3 bucket with the following path: s3://YOUR-S3-BUCKET/. For instructions on setting up the Spark History Server and exploring the logs, refer to Launching the Spark history server and viewing the Spark UI using Docker.

spark-submit

spark-submit is a command line interface for running Apache Spark applications on a cluster or locally. It allows you to submit applications to Spark clusters. The tool enables simple configuration of application properties, resource allocation, and custom libraries, streamlining the deployment and management of Spark jobs.

Beginning with Amazon EMR 6.10, spark-submit is supported as a job submission method. This method currently only supports cluster mode as the submission mechanism. To submit jobs using the spark-submit method, we reuse the IAM role for the service account we set up earlier. We also use the S3 bucket used for the Spark Operator method. The steps in this section take you through how to configure and submit jobs with spark-submit and benefit from EMR runtime improvements.

  1. In order to submit a job, you need to use the Spark version that matches the one available in Amazon EMR. For Amazon EMR 6.10, you need to download the Spark 3.3 version.
  2. You also need to make sure you have Java installed in your environment.
  3. Unzip the file and navigate to the root of the Spark directory.
  4. In the following code, replace the EKS endpoint as well as the S3 bucket then run the script:
./bin/spark-submit \
--class ValueZones \
--master k8s://EKS-ENDPOINT \
--conf spark.kubernetes.namespace=data-team-a \
--conf spark.kubernetes.container.image=608033475327.dkr.ecr.us-west-1.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-job-execution-sa \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=emr-job-execution-sa \
--conf spark.driver.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.driver.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.executor.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.executor.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--conf spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3.EMRFSDelegate \
--conf spark.hadoop.fs.s3.buffer.dir=/mnt/s3 \
--conf spark.hadoop.fs.s3n.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--deploy-mode cluster \
s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar s3://nyc-tlc/csv_backup 2017 s3://nyc-tlc/misc/taxi_zone_lookup.csv s3://S3_BUCKET/emr-eks-results emr_eks_demo

The job takes about 7 minutes to complete with two executors of one core and 1 G of memory.

Using custom kubernetes schedulers

Customers running a large volume of jobs concurrently might face challenges related to providing fair access to compute capacity that they aren’t able to solve with the standard scheduling and resource utilization management Kubernetes offers. In addition, customers that are migrating from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) and are managing their scheduling with YARN queues will not be able to transpose them to Kubernetes scheduling capabilities.

To overcome this issue, you can use custom schedulers like Apache Yunikorn or Volcano.Spark Operator natively supports these schedulers, and with them you can schedule Spark applications based on factors such as priority, resource requirements, and fairness policies, while Spark Operator simplifies application deployment and management. To set up Yunikorn with gang scheduling and use it in Spark applications submitted through Spark Operator, refer to Spark Operator with YuniKorn.

Clean up

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment:

eksctl delete cluster -f eks-cluster.yaml

Conclusion

In this post, we introduced the EMR runtime feature for Spark Operator and spark-submit, and explored the advantages of using this feature on an EKS cluster. With the optimized EMR runtime, you can significantly enhance the performance of your Spark applications while optimizing costs. We demonstrated the deployment of the cluster using the eksctl tool, , you can also use the Data on EKS blueprints for deploying a production-ready EKS which you can use for EMR on EKS and leverage these new deployment methods in addition to the EMR on EKS API job submission method. By running your applications on the optimized EMR runtime, you can further enhance your Spark application workflows and drive innovation in your data processing pipelines.


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Vara Bonthu is a dedicated technology professional and Worldwide Tech Leader for Data on EKS, specializing in assisting AWS customers ranging from strategic accounts to diverse organizations. He is passionate about open-source technologies, data analytics, AI/ML, and Kubernetes, and boasts an extensive background in development, DevOps, and architecture. Vara’s primary focus is on building highly scalable data and AI/ML solutions on Kubernetes platforms, helping customers harness the full potential of cutting-edge technology for their data-driven pursuits.

Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/improve-operational-efficiencies-of-apache-iceberg-tables-built-on-amazon-s3-data-lakes/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3) and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. When you build your transactional data lake using Apache Iceberg to solve your functional use cases, you need to focus on operational use cases for your S3 data lake to optimize the production environment. Some of the important non-functional use cases for an S3 data lake that organizations are focusing on include storage cost optimizations, capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake, and handling increased Amazon S3 request rates.

In this post, we show you how to improve operational efficiencies of your Apache Iceberg tables built on Amazon S3 data lake and Amazon EMR big data platform.

Optimize data lake storage

One of the major advantages of building modern data lakes on Amazon S3 is it offers lower cost without compromising on performance. You can use Amazon S3 Lifecycle configurations and Amazon S3 object tagging with Apache Iceberg tables to optimize the cost of your overall data lake storage. An Amazon S3 Lifecycle configuration is a set of rules that define actions that Amazon S3 applies to a group of objects. There are two types of actions:

  • Transition actions – These actions define when objects transition to another storage class; for example, Amazon S3 Standard to Amazon S3 Glacier.
  • Expiration actions – These actions define when objects expire. Amazon S3 deletes expired objects on your behalf.

Amazon S3 uses object tagging to categorize storage where each tag is a key-value pair. From an Apache Iceberg perspective, it supports custom Amazon S3 object tags that can be added to S3 objects while writing and deleting into the table. Iceberg also let you configure a tag-based object lifecycle policy at the bucket level to transition objects to different Amazon S3 tiers. With the s3.delete.tags config property in Iceberg, objects are tagged with the configured key-value pairs before deletion. When the catalog property s3.delete-enabled is set to false, the objects are not hard-deleted from Amazon S3. This is expected to be used in combination with Amazon S3 delete tagging, so objects are tagged and removed using an Amazon S3 lifecycle policy. This property is set to true by default.

The example notebook in this post shows an example implementation of S3 object tagging and lifecycle rules for Apache Iceberg tables to optimize storage cost.

Implement business continuity

Amazon S3 gives any developer access to the same highly scalable, reliable, fast, inexpensive data storage infrastructure that Amazon uses to run its own global network of web sites. Amazon S3 is designed for 99.999999999% (11 9’s) of durability, S3 Standard is designed for 99.99% availability, and Standard – IA is designed for 99.9% availability. Still, to make your data lake workloads highly available in an unlikely outage situation, you can replicate your S3 data to another AWS Region as a backup. With S3 data residing in multiple Regions, you can use an S3 multi-Region access point as a solution to access the data from the backup Region. With Amazon S3 multi-Region access point failover controls, you can route all S3 data request traffic through a single global endpoint and directly control the shift of S3 data request traffic between Regions at any time. During a planned or unplanned regional traffic disruption, failover controls let you control failover between buckets in different Regions and accounts within minutes. Apache Iceberg supports access points to perform S3 operations by specifying a mapping of bucket to access points. We include an example implementation of an S3 access point with Apache Iceberg later in this post.

Increase Amazon S3 performance and throughput

Amazon S3 supports a request rate of 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. The resources for this request rate aren’t automatically assigned when a prefix is created. Instead, as the request rate for a prefix increases gradually, Amazon S3 automatically scales to handle the increased request rate. For certain workloads that need a sudden increase in the request rate for objects in a prefix, Amazon S3 might return 503 Slow Down errors, also known as S3 throttling. It does this while it scales in the background to handle the increased request rate. Also, if supported request rates are exceeded, it’s a best practice to distribute objects and requests across multiple prefixes. Implementing this solution to distribute objects and requests across multiple prefixes involves changes to your data ingress or data egress applications. Using Apache Iceberg file format for your S3 data lake can significantly reduce the engineering effort through enabling the ObjectStoreLocationProvider feature, which adds an S3 hash [0*7FFFFF] prefix in your specified S3 object path.

Iceberg by default uses the Hive storage layout, but you can switch it to use the ObjectStoreLocationProvider. This option is not enabled by default to provide flexibility to choose the location where you want to add the hash prefix. With ObjectStoreLocationProvider, a deterministic hash is generated for each stored file and a subfolder is appended right after the S3 folder specified using the parameter write.data.path (write.object-storage-path for Iceberg version 0.12 and below). This ensures that files written to Amazon S3 are equally distributed across multiple prefixes in your S3 bucket, thereby minimizing the throttling errors. In the following example, we set the write.data.path value as s3://my-table-data-bucket, and Iceberg-generated S3 hash prefixes will be appended after this location:

CREATE TABLE my_catalog.my_ns.my_table
( id bigint,
data string,
category string)
USING iceberg OPTIONS
( 'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket')
PARTITIONED BY (category);

Your S3 files will be arranged under MURMUR3 S3 hash prefixes like the following:

2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet
2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet
2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet

Using Iceberg ObjectStoreLocationProvider is not a foolproof mechanism to avoid S3 503 errors. You still need to set appropriate EMRFS retries to provide additional resiliency. You can adjust your retry strategy by increasing the maximum retry limit for the default exponential backoff retry strategy or enabling and configuring the additive-increase/multiplicative-decrease (AIMD) retry strategy. AIMD is supported for Amazon EMR releases 6.4.0 and later. For more information, refer to Retry Amazon S3 requests with EMRFS.

In the following sections, we provide examples for these use cases.

Storage cost optimizations

In this example, we use Iceberg’s S3 tags feature with the write tag as write-tag-name=created and delete tag as delete-tag-name=deleted. This example is demonstrated on an EMR version emr-6.10.0 cluster with installed applications Hadoop 3.3.3, Jupyter Enterprise Gateway 2.6.0, and Spark 3.3.1. The examples are run on a Jupyter Notebook environment attached to the EMR cluster. To learn more about how to create an EMR cluster with Iceberg and use Amazon EMR Studio, refer to Use an Iceberg cluster with Spark and the Amazon EMR Studio Management Guide, respectively.

The following examples are also available in the sample notebook in the aws-samples GitHub repo for quick experimentation.

Configure Iceberg on a Spark session

Configure your Spark session using the %%configure magic command. You can use either the AWS Glue Data Catalog (recommended) or a Hive catalog for Iceberg tables. In this example, we use a Hive catalog, but we can change to the Data Catalog with the following configuration:

spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog

Before you run this step, create a S3 bucket and an iceberg folder in your AWS account with the naming convention <your-iceberg-storage-blog>/iceberg/.

Update your-iceberg-storage-blog in the following configuration with the bucket that you created to test this example. Note the configuration parameters s3.write.tags.write-tag-name and s3.delete.tags.delete-tag-name, which will tag the new S3 objects and deleted objects with corresponding tag values. We use these tags in later steps to implement S3 lifecycle policies to transition the objects to a lower-cost storage tier or expire them based on the use case.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog", "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.dev.warehouse":"s3://&amp;amp;lt;your-iceberg-storage-blog&amp;amp;gt;/iceberg/", "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created", "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted", "spark.sql.catalog.dev.s3.delete-enabled":"false" } }

Create an Apache Iceberg table using Spark-SQL

Now we create an Iceberg table for the Amazon Product Reviews Dataset:

spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")
spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date date,
year int)
USING iceberg
location 's3://<your-iceberg-storage-blog>/iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

In the next step, we load the table with the dataset using Spark actions.

Load data into the Iceberg table

While inserting the data, we partition the data by review_date as per the table definition. Run the following Spark commands in your PySpark notebook:

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

df.sortWithinPartitions("review_date").writeTo("dev.db.amazon_reviews_iceberg").append()

Insert a single record into the same Iceberg table so that it creates a partition with the current review_date:

spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

You can check the new snapshot is created after this append operation by querying the Iceberg snapshot:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

You will see an output similar to the following showing the operations performed on the table.

Check the S3 tag population

You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to check the tags populated for the new writes. Let’s check the tag corresponding to the object created by a single row insert.

On the Amazon S3 console, check the S3 folder s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ and point to the partition review_date_year=2023/. Then check the Parquet file under this folder to check the tags associated with the data file in Parquet format.

From the AWS CLI, run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created":

xxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see an output, similar to the below, showing the associated tags for the file

{ "VersionId": "null", "TagSet": [{ "Key": "write-tag-name", "Value": "created" } ] }

Delete a record and expire a snapshot

In this step, we delete a record from the Iceberg table and expire the snapshot corresponding to the deleted record. We delete the new single record that we inserted with the current review_date:

spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

We can now check that a new snapshot was created with the operation flagged as delete:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

This is useful if we want to time travel and check the deleted row in the future. In that case, we have to query the table with the snapshot-id corresponding to the deleted row. However, we don’t discuss time travel as part of this post.

We expire the old snapshots from the table and keep only the last two. You can modify the query based on your specific requirements to retain the snapshots:

spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

If we run the same query on the snapshots, we can see that we have only two snapshots available:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

From the AWS CLI, you can run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3. delete.tags.delete-tag-name":"deleted":

xxxxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket avijit-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see output similar to below showing the associated tags for the file

{ "VersionId": "null", "TagSet": [ { "Key": "delete-tag-name", "Value": "deleted" }, { "Key": "write-tag-name", "Value": "created" } ] }

You can view the existing metadata files from the metadata log entries metatable after the expiration of snapshots:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

The snapshots that have expired show the latest snapshot ID as null.

Create S3 lifecycle rules to transition the buckets to a different storage tier

Create a lifecycle configuration for the bucket to transition objects with the delete-tag-name=deleted S3 tag to the Glacier Instant Retrieval class. Amazon S3 runs lifecycle rules one time every day at midnight Universal Coordinated Time (UTC), and new lifecycle rules can take up to 48 hours to complete the first run. Amazon S3 Glacier is well suited to archive data that needs immediate access (with milliseconds retrieval). With S3 Glacier Instant Retrieval, you can save up to 68% on storage costs compared to using the S3 Standard-Infrequent Access (S3 Standard-IA) storage class, when the data is accessed once per quarter.

When you want to access the data back, you can bulk restore the archived objects. After you restore the objects back in S3 Standard class, you can register the metadata and data as an archival table for query purposes. The metadata file location can be fetched from the metadata log entries metatable as illustrated earlier. As mentioned before, the latest snapshot ID with Null values indicates expired snapshots. We can take one of the expired snapshots and do the bulk restore:

spark.sql("""CALL dev.system.register_table(table => 'db.amazon_reviews_iceberg_archive', metadata_file => 's3://avijit-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/metadata/00000-a010f15c-7ac8-4cd1-b1bc-bba99fa7acfc.metadata.json')""").show()

Capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake

Because Iceberg doesn’t support relative paths, you can use access points to perform Amazon S3 operations by specifying a mapping of buckets to access points. This is useful for multi-Region access, cross-Region access, disaster recovery, and more.

For cross-Region access points, we need to additionally set the use-arn-region-enabled catalog property to true to enable S3FileIO to make cross-Region calls. If an Amazon S3 resource ARN is passed in as the target of an Amazon S3 operation that has a different Region than the one the client was configured with, this flag must be set to ‘true‘ to permit the client to make a cross-Region call to the Region specified in the ARN, otherwise an exception will be thrown. However, for the same or multi-Region access points, the use-arn-region-enabled flag should be set to ‘false’.

For example, to use an S3 access point with multi-Region access in Spark 3.3, you can start the Spark SQL shell with the following code:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
--conf spark.sql.catalog.test.s3.access-points.my-bucket2=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap

In this example, the objects in Amazon S3 on my-bucket1 and my-bucket2 buckets use the arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap access point for all Amazon S3 operations.

For more details on using access points, refer to Using access points with compatible Amazon S3 operations.

Let’s say your table path is under mybucket1, so both mybucket1 in Region 1 and mybucket2 in Region have paths of mybucket1 inside the metadata files. At the time of the S3 (GET/PUT) call, we replace the mybucket1 reference with a multi-Region access point.

Handling increased S3 request rates

When using ObjectStoreLocationProvider (for more details, see Object Store File Layout), a deterministic hash is generated for each stored file, with the hash appended directly after the write.data.path. The problem with this is that the default hashing algorithm generates hash values up to Integer MAX_VALUE, which in Java is (2^31)-1. When this is converted to hex, it produces 0x7FFFFFFF, so the first character variance is restricted to only [0-8]. As per Amazon S3 recommendations, we should have the maximum variance here to mitigate this.

Starting from Amazon EMR 6.10, Amazon EMR added an optimized location provider that makes sure the generated prefix hash has uniform distribution in the first two characters using the character set from [0-9][A-Z][a-z].

This location provider has been recently open sourced by Amazon EMR via Core: Improve bit density in object storage layout and should be available starting from Iceberg 1.3.0.

To use, make sure the iceberg.enabled classification is set to true, and write.location-provider.impl is set to org.apache.iceberg.emr.OptimizedS3LocationProvider.

The following is a sample Spark shell command:

spark-shell --conf spark.driver.memory=4g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/iceberg-V516168123 \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.table-override.write.location-provider.impl=org.apache.iceberg.emr.OptimizedS3LocationProvider

The following example shows that when you enable the object storage in your Iceberg table, it adds the hash prefix in your S3 path directly after the location you provide in your DDL.

Define the table write.object-storage.enabled parameter and provide the S3 path, after which you want to add the hash prefix using write.data.path (for Iceberg Version 0.13 and above) or write.object-storage.path (for Iceberg Version 0.12 and below) parameters.

Insert data into the table you created.

The hash prefix is added right after the /current/ prefix in the S3 path as defined in the DDL.

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

As companies continue to build newer transactional data lake use cases using Apache Iceberg open table format on very large datasets on S3 data lakes, there will be an increased focus on optimizing those petabyte-scale production environments to reduce cost, improve efficiency, and implement high availability. This post demonstrated mechanisms to implement the operational efficiencies for Apache Iceberg open table formats running on AWS.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Prashant Singh is a Software Development Engineer at AWS. He is interested in Databases and Data Warehouse engines and has worked on Optimizing Apache Spark performance on EMR. He is an active contributor in open source projects like Apache Spark and Apache Iceberg. During his free time, he enjoys exploring new places, food and hiking.