Tag Archives: Amazon EMR

Fine-grained access control in Amazon EMR Serverless with AWS Lake Formation

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/fine-grained-access-control-in-amazon-emr-serverless-with-aws-lake-formation/

In today’s data-driven world , enterprises are increasingly reliant on vast amounts of data to drive decision-making and innovation. With this reliance comes the critical need for robust data security and access control mechanisms. Fine-grained access control restricts access to specific data subsets, protecting sensitive information and maintaining regulatory compliance. It allows organizations to set detailed permissions at various levels, including database, table, column, and row. This precise control mitigates risks of unauthorized access, data leaks, and misuse. In the unfortunate event of a security incident, fine-grained access control helps limit the scope of the breach, minimizing potential damage.
AWS is introducing general availability of fine-grained access control based on AWS Lake Formation for Amazon EMR Serverless on Amazon EMR 7.2. Enterprises can now significantly enhance their data governance and security frameworks. This new integration supports the implementation of modern data lake architectures, such as data mesh, by providing a seamless way to manage and analyze data. You can use EMR Serverless to enforce data access controls using Lake Formation when reading data from Amazon Simple Storage Service (Amazon S3), enabling robust data processing workflows and real-time analytics without the overhead of cluster management.

In this post, we discuss how to implement fine-grained access control in EMR Serverless using Lake Formation. With this integration, organizations can achieve better scalability, flexibility, and cost-efficiency in their data operations, ultimately driving more value from their data assets.

Key use cases for fine-grained access control in analytics

The following are key use cases for fine-grained access control in analytics:

  • Customer 360 – You can enable different departments to securely access specific customer data relevant to their functions. For example, the sales team can be granted access only to data such as customer purchase history, preferences, and transaction patterns. Meanwhile, the marketing team is limited to viewing campaign interactions, customer demographics, and engagement metrics.
  • Financial reporting – You can enable financial analysts to access the necessary data for reporting and analysis while restricting sensitive financial details to authorized executives.
  • Healthcare analytics – You can enable healthcare researchers and data scientists to analyze de-identified patient data for medical advancements and research, while making sure Protected Health Information (PHI) remains confidential and accessible only to authorized healthcare professionals and personnel.
  • Supply chain optimization – You can grant logistics teams visibility into inventory and shipment data while limiting access to pricing or supplier information to relevant stakeholders.

Solution overview

In this post, we explore how to implement fine-grained access control on Iceberg tables within an EMR Serverless application, using the capabilities of Lake Formation. If you’re interested in learning how to implement fine-grained access control on open table formats in Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) instances using Lake Formation, refer to Enforce fine-grained access control on Open Table Formats via Amazon EMR integrated with AWS Lake Formation.
With the data access control features available in Lake Formation, you can enforce granular permissions and govern access to specific columns, rows, or cells within your Iceberg tables. This approach makes sure sensitive data remains secure and accessible only to authorized users or applications, aligning with your organization’s data governance policies and regulatory compliance requirements.

A cross-account modern data platform on AWS involves setting up a centralized data lake in a primary AWS account, while allowing controlled access to this data from secondary AWS accounts. This setup helps organizations maintain a single source of truth for their data, provides consistent data governance, and uses the robust security features of AWS across multiple business units or project teams.

To demonstrate how you can use Lake Formation to implement cross account fine-grained access control within an EMR Serverless environment, we use the TPC-DS dataset to create tables in the AWS Glue Data Catalog in the AWS producer account and provision different user personas to reflect various roles and access levels in the AWS consumer account, forming a secure and governed data lake.

The following diagram illustrates the solution architecture.

The producer account contains the following persona:

  • Data engineer – Tasks include data preparation, bulk updates, and incremental updates. The data engineer has the following access:
    • Table-level access – Full read/write access to all TPC-DS tables.

The consumer account contains the following personas:

  • Finance analyst – We run a sample query that performs a sales data analysis to guide marketing, inventory, and promotion strategies based on demographic and geographic performance. The finance analyst has the following access:
    • Table-level access – Full access to tables store_sales, catalog_sales, web_sales, item, and promotion for comprehensive financial analysis.
    • Column-level access – Limited access to cost-related columns in the sales tables to avoid exposure to sensitive pricing strategies. Limited access to sensitive columns like credit_rating in the customer_demographics table.
    • Row-level access – Access only to sales data from the current fiscal year or specific promotional periods.
  • Product analyst – We run a sample query that does a customer behavior analysis to tailor marketing, promotions, and loyalty programs based on purchase patterns and regional insights. The product analyst has the following access:
    • Table-level access – Full access to tables item, store_sales, and customer tables to evaluate product and market trends.
    • Column-level access – Restricted access to personal identifiers in the customer table, such as customer_address , email_address, and date of birth.

Prerequisites

You should have the following prerequisites:

Set up infrastructure in the producer account

We provide a CloudFormation template to deploy the data lake stack with the following resources:

  • Two S3 buckets: one for scripts and query results, and one for the data lake storage
  • An Amazon Athena workgroup
  • An EMR Serverless application
  • An AWS Glue database and tables on external public S3 buckets of TPC-DS data
  • An AWS Glue database for the data lake
  • An IAM role and polices

Set up Lake Formation for the data engineer in the producer account

Set up Lake Formation cross-account data sharing version settings:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. Under Data Catalog settings, pick Version 4 under Cross-account version settings.

To learn more about the differences between data sharing versions, refer to Updating cross-account data sharing version settings. Make sure Default permissions for newly created databases and tables is unchecked.

Register the Amazon S3 location as the data lake location

When you register an Amazon S3 location with Lake Formation, you specify an IAM role with read/write permissions on that location. After registering, when EMR Serverless requests access to this Amazon S3 location, Lake Formation will supply temporary credentials of the provided role to access the data. We already created the role LakeFormationServiceRole using the CloudFormation template. To register the Amazon S3 location as the data lake location, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. In the navigation pane, choose Data lake locations under Administration.
  3. Choose Register location.
  4. For Amazon S3 path, enter s3://<DatalakeBucketName>. (Copy the bucket name from the CloudFormation stack’s Outputs tab.)
  5. For IAM role, enter LakeFormationServiceRoleDatalake.
  6. For Permission mode, select Lake Formation.
  7. Choose Register location.

Generate TPC-DS tables in the producer account

In this section, we generate TPC-DS tables in Iceberg format in the producer account.
Grant database permissions to the data engineer
First, let’s grant database permissions to the data engineer IAM role Amazon-EMR-ExecutionRole_DE that we will use with EMR Serverless. Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. Choose Databases and Create database.
  3. Enter iceberg_db for Name and s3://<DatalakeBucketName> for Location.
  4. Choose Create database.
  5. In the navigation pane, choose Data lake permissions and choose Grant.
  6. In the Principles section, select IAM users and roles and choose Amazon-EMR-ExecutionRole_DE.
  7. In the LF-Tags or catalog resources section, select Named Data Catalog resources and choose tpc-source and iceberg_db for Databases.
  8. Select Super for both Database permissions and Grantable permissions and choose Grant.

Create an EMR Serverless application

Now, let’s log in to EMR Serverless using Amazon EMR Studio and complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless.
  2. Under Manage applications, choose my-emr-studio. You will be directed to the Create application page on EMR Studio. Let’s create a Lake Formation enabled EMR Serverless application
  3.  Under Application settings, provide the following information:
    1. For Name, enter a name emr-fgac-application.
    2. For Type, choose Spark.
    3. For Release version, choose emr-7.2.0.
    4. For Architecture, choose x86_64.
  4. Under Application setup options, select Use custom settings.
  5. Under Interactive endpoint, select Enable endpoint for EMR studio
  6. Under Additional configurations, for Metastore configuration, select Use AWS Glue Data Catalog as metastore, then select Use Lake Formation for fine-grained access control.
  7. Under Network connections, choose emrs-vpc for the VPC, enter any two private subnets, and enter emr-serverless-sg for Security groups.
  8. Choose Create and start application.

Create a Workspace

Complete the following steps to create an EMR Workspace:

  1. On the Amazon EMR console, choose Workspaces in the navigation pane and choose Create Workspace.
  2. Enter the Workspace name emr-fgac-workspace.
  3. Leave all other settings as default and choose Create Workspace.
  4. Choose Launch Workspace. Your browser might request to allow pop-up permissions for the first time launching the Workspace.
  5. After the Workspace is launched, in the navigation pane, choose Compute.
  6. For Compute type¸ select EMR Serverless application and enter emr-fgac-application for the application and Amazon-EMR-ExecutionRole_DE as the runtime role.
  7. Make sure the kernel attached to the Workspace is PySpark.
  8. Navigate to the File browser section and choose Upload files.
  9. Upload the file Iceberg-ingest-final_v2.ipynb.
  10. Update the data lake bucket name, AWS account ID, and AWS Region accordingly.
  11. Choose the double arrow icon to restart the kernel and rerun the notebook.


To verify that the data is generated, you can go to the AWS Glue console. Under Data Catalog, Databases, you should see TPC-DS tables ending with _iceberg for the database iceberg_db.

Share the database and TPC-DS tables to the consumer account

We now grant permissions to the consumer account, including grantable permissions. This allows the Lake Formation data lake administrator in the consumer account to control access to the data within the account.

Grant database permissions to the consumer account

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. In the navigation pane, choose Databases.
  3. Select the database iceberg_db, and on the Actions menu, under Permissions, choose Grant.
  4. In the Principles section, select External accounts and enter the consumer account.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and choose iceberg_db for Databases.
  6. In the Database permissions section, select Describe for both Database permissions and Grantable permissions.

This allows the data lake administrator in the consumer account to describe the database and grant describe permissions to other principals in the consumer account.

Grant table permissions to the consumer account

Repeat the preceding steps to grant table permissions to the consumer account.

Choose All tables under Tables and provide select and describe permissions for Table permissions and Grantable permissions.

Set up Lake Formation in the consumer account

For the remaining section of the post, we focus on the consumer account. Deploy the following CloudFormation stack to set up resources:

The template will create the Amazon EMR runtime role for both analyst user personas.
Log in to the AWS consumer account and accept the AWS RAM invitation first:

  1. Open the AWS RAM console with the IAM identity that has AWS RAM access.
  2. In the navigation pane, choose Resource shares under Shared with me.
  3. You should see two pending resource shares from the producer account.
  4. Accept both invitations.

You should be able to see the iceberg_db database on the Lake Formation console.

Create a resource link for the shared database

To access the database and table resources that were shared by the producer AWS account, you need to create a resource link in the consumer AWS account. A resource link is a Data Catalog object that is a link to a local or shared database or table. After you create a resource link to a database or table, you can use the resource link name wherever you would use the database or table name. In this step, you grant permission on the resource links to the job runtime roles for EMR Serverless. The runtime roles will then access the data in shared databases and underlying tables through the resource link.
To create a resource link, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the iceberg_db database, verify that the owner account ID is the producer account, and on the Actions menu, choose Create resource links.
  4. For Resource link name, enter the name of the resource link (iceberg_db_shared).
  5. For Shared database’s region, choose the Region of the iceberg_db database.
  6. For Shared database, choose the iceberg_db database.
  7. For Shared database’s owner ID, enter the account ID of the producer account.
  8. Choose Create.

Grant permissions on the resource link to the EMR job runtime roles

Grant permissions on the resource link to Amazon-EMR-ExecutionRole_Finance and Amazon-EMR-ExecutionRole_Product using the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (iceberg_db_shared) and on the Actions menu, choose Grant.
  4. In the Principles section, select IAM users and roles, and choose Amazon-EMR-ExecutionRole_Finance and Amazon-EMR-ExecutionRole_Product.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and for Databases, choose iceberg_db_shared.
  6. In the Resource link permissions section, select Describe for Resource link permissions.

This allows the EMR Serverless job runtime roles to describe the resource link. We don’t make any selections for grantable permissions because runtime roles shouldn’t be able to grant permissions to other principles.
Choose Grant.

Grant table permissions for the finance analyst

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (iceberg_db_shared) and on the Actions menu, choose Grant on target.
  4. In the Principles section, select IAM users and roles, then choose Amazon-EMR-ExecutionRole_Finance.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and specify the following:
    1. For Databases, choose iceberg_db.
    2. For Tables¸ choose store_sales_iceberg.
  6. In the Table permissions section, for Table permissions, select Select.
  7. In the Data permissions section, select Column-based access.
  8. Select Exclude columns and choose all cost-related columns (ss_wholesale_cost and ss_ext_wholesale_cost).
  9. Choose Grant.
  10. Similarly, grant access to table customer_demographics_iceberg and exclude the column cd_credit_rating.
  11. Following the same steps, grant All data access for tables store_iceberg and item_iceberg.
  12. For the table date_dim_iceberg, we provide selective row-level access.
  13. Similar to the preceding table permissions, select date_dim_iceberg under Tables and in the Data filters section, choose Create new.
  14. For Data filter name, enter FA_Filter_year.
  15. Select Access to all columns under Column-level access.
  16. Select Filter rows and for Row filter expression, enter d_year=2002 to only provide access to the 2002 year.
  17. Choose Save changes.
  18. Choose Create filter.
  19. Make sure FA_Filter_year is selected under Data filters and grant select permissions on the filter.

Grant table permissions for the product analyst

You can provide permissions for the next set of tables required for the product analyst role using the Lake Formation console. Alternatively, you can use the AWS Command Line Interface (AWS CLI) to grant permissions. We provide grant on target permissions for the resource link iceberg_db_shared to IAM role Amazon-EMR-ExecutionRole_Product.

  1. Similar to steps followed in previous sections, for table store_sales_iceberg, date_dim_iceberg, store_iceberg, and house_hold_demographics_iceberg, provide select permissions for All data access. Make sure the role selected is Amazon-EMR-ExecutionRole_Product.

For table customer_iceberg, we limit access to personally identifiable information (PII) columns.

  1. Under Data permissions, select Column-based access and Exclude columns.
  2. Choose columns c_birth_day, c_birth_month, c_birth_year, c_current_addr_sk, c_customer_id, c_email_address, and c_birth_country.

Verify access using interactive notebooks from EMR Studio

Complete the following steps to test role access:

  1. Log in to the AWS consumer account and open the Amazon EMR console.
  2. Choose EMR Serverless in the navigation pane and choose an existing EMR Studio.
  3. If you don’t have EMR Studio configured, choose Get Started and select Create and launch EMR Studio.
  4. Create a Lake Formation enabled EMR Serverless application as described in previous sections.
  5. Create an EMR Studio Workspace as described in previous sections.
  6. Use emr-studio-service-role for Service role and datalake-resources-<account_id>-<region> for Workspace storage, then launch your Workspace.

Now, let’s verify access for the finance analyst.

  1. Make sure the compute type inside your Workspace is pointing to the EMR Serverless application created in the prior step and Amazon-EMR-ExecutionRole_Finance as the interactive runtime role.
  2. Go to File browser in the navigation pane, choose Upload files, and add Notebook_FA.ipynb to your Workspace.
  3. Run all the cells to verify fine-grained access.

Now let’s test access for the product analyst.

  1. In the same Workspace, detach and attach the same EMR Serverless application with Amazon-EMR-ExecutionRole_Product as the interactive runtime role.
  2. Upload Notebook_PA.ipynb under the File browser section.
  3. Run all the cells to verify fine-grained access for the product analyst.

In a real-world scenario, both analysts will likely have their own Workspace with restricted rights to assume only the authorized interactive runtime role.

Considerations and limitations

EMR Serverless with Lake Formation uses Spark resource profiles to create two profiles and two Spark drivers for access control. Read this white paper to learn about the feature details. The user profile runs the supplied code, and the system profile enforces Lake Formation policies. Therefore, it’s recommended that you have a minimum of two Spark drivers when pre-initialized capacity is used with Lake Formation enabled jobs. No change in executor count is required. Refer to Using EMR Serverless with AWS Lake Formation for fine-grained access control to learn more about the technical implementation of the Lake Formation integration with EMR Serverless.

You can expect a performance overhead after enabling Lake Formation. The level of access (table, column, or row) and the amount of data filtered will have significant impact on query performance.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary (consumer)  account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary (producer) account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed how to integrate Lake Formation with EMR Serverless to manage access to Iceberg tables. This solution showcases a modern way to enforce fine-grained access control in a multi-account open data lake setup. The approach simplifies data management in the main account while carefully controlling how users access data in other secondary accounts.

Try out the solution for your own use case, and let us know your feedback and questions in the comments section.


About the Authors

Anubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Analyze Amazon EMR on Amazon EC2 cluster usage with Amazon Athena and Amazon QuickSight

Post Syndicated from Boon Lee Eu original https://aws.amazon.com/blogs/big-data/analyze-amazon-emr-on-amazon-ec2-cluster-usage-with-amazon-athena-and-amazon-quicksight/

Gaining granular visibility into application-level costs on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) clusters presents an opportunity for customers looking for ways to further optimize resource utilization and implement fair cost allocation and chargeback models. By breaking down the usage of individual applications running in your EMR cluster, you can unlock several benefits:

  • Informed workload management – Application-level cost insights empower organizations to prioritize and schedule workloads effectively. Resource allocation decisions can be made with a better understanding of cost implications, potentially improving overall cluster performance and cost-efficiency.
  • Cost optimization – With granular cost attribution, organizations can identify cost-saving opportunities for individual applications. They can right-size underutilized resources or prioritize optimization efforts for applications that are driving high usage and costs.
  • Transparent billing – In multi-tenant environments, organizations can implement fair and transparent cost allocation models based on individual application resource consumption and associated costs. This fosters accountability and enables accurate chargebacks to tenants.

In this post, we guide you through deploying a comprehensive solution in your Amazon Web Services (AWS) environment to analyze Amazon EMR on EC2 cluster usage. By using this solution, you will gain a deep understanding of resource consumption and associated costs of individual applications running on your EMR cluster. This will help you optimize costs, implement fair billing practices, and make informed decisions about workload management, ultimately enhancing the overall efficiency and cost-effectiveness of your Amazon EMR environment. This solution has been only tested on Spark workloads running on EMR on EC2 that uses YARN as its resource manager. It hasn’t been tested on workloads from other frameworks that run on YARN, such as HIVE or TEZ.

Solution overview

The solution works by running a Python script on the EMR cluster’s primary node to collect metrics from the YARN resource manager and correlate them with cost usage details from the AWS Cost and Usage Reports (AWS CUR). The script activated by a cronjob makes HTTP requests to the YARN resource manager to collect two types of metrics from paths /ws/v1/cluster/metrics for cluster metrics and /ws/v1/cluster/apps for application metrics. The cluster metrics contain utilization information of cluster resources, and the application metrics contain utilization information of an application or job. These metrics are stored in an Amazon Simple Storage Service (Amazon S3) bucket.

There are two YARN metrics that capture the resource utilization information of an application or job.

  • memorySeconds – This is the memory (in MB) allocated to an application times the number of seconds the application ran
  • vcoreSeconds – This is the number of YARN vcores allocated to an application times the number of seconds application ran

The solution uses memorySeconds to derive the cost of running the application or job. It can be modified to use vcoreSeconds instead if necessary.

The metadata of the YARN metrics collected in Amazon S3 is created, stored, and represented as database and tables in AWS Glue Data Catalog, which is in turn available to Amazon Athena for further processing. You can now write SQL queries in Athena to correlate the YARN metrics with the cost usage information from AWS CUR to derive the detailed cost breakdown of your EMR cluster by infrastructure and application. This solution creates two corresponding Athena views of the respective cost breakdown that will become the data source to Amazon QuickSight for visualization.

The following diagram shows the solution architecture.

EMR Cluster Usage Utility Solution Architecture

Prerequisites

To perform the solution, you need the following prerequisites:

  1. Confirm that a CUR is created in your AWS account. It needs an S3 bucket to store the report files. Follow the steps described in Creating Cost and Usage Reports to create the CUR on the AWS Management Console. When creating the report, make sure the following settings are enabled:
    • Include resource IDs
    • Time granularity is set to hourly
    • Report data integration to Athena

It can take up to 24 hours for AWS to start delivering reports to your S3 bucket. Thereafter, your CUR gets updated at least one time a day.

  1. The solution needs Athena to run queries against the data from the CUR using standard SQL. To automate and streamline the integration of Athena with CUR, AWS provides an AWS CloudFormation template, crawler-cfn.yml, which is automatically generated in the same S3 bucket during CUR creation. Follow the instructions in Setting up Athena using AWS CloudFormation templates to integrate Athena with the CUR. This template will create an AWS Glue database that references to the CUR, an AWS Lambda event and an AWS Glue crawler that gets invoked by S3 event notification to update the AWS Glue database whenever the CUR gets updated.
  2. Make sure to activate the AWS generated cost allocation tag, aws:elasticmapreduce:job-flow-id. This enables the field, resource_tags_aws_elasticmapreduce_job_flow_id, in the CUR to be populated with the EMR cluster ID and is used by the SQL queries in the solution. To activate the cost allocation tag from the management console, follow these steps:
    • Sign in to the payer account’s AWS Management Console and open the AWS Billing and Cost Management console
    • In the navigation pane, choose Cost Allocation Tags
    • Under AWS generated cost allocation tags, choose the aws:elasticmapreduce:job-flow-id tag
    • Choose Activate. It can take up to 24 hours for tags to activate.

The following screenshot shows an example of the aws:elasticmapreduce:job-flow-id tag being activated.

CostAllocationTag

You can now test out this solution on an EMR cluster in a lab environment. If you’re not already familiar with EMR, follow the detailed instructions provided in Tutorial: Getting started with Amazon EMR to launch a new EMR cluster and run a sample Spark job.

Deploying the solution

To deploy the solution, follow the steps in the next sections.

Installing scripts to the EMR cluster

Download two scripts from the GitHub repository and save them into an S3 bucket:

  • emr_usage_report.py – Python script that makes the HTTP requests to YARN Resource Manager
  • emr_install_report.sh  – Bash script that creates a cronjob to run the python script every minute

To install the scripts, add a step to the EMR cluster through the console or AWS Command Line Interface (AWS CLI) using aws emr add-step command.

Replace:

  • REGION with the AWS Regions where the cluster is running (for example, Europe (Ireland) eu-west-1)
  • MY-BUCKET with the name of the bucket where the script is stored (for example, my.artifact.bucket)
  • MY_REPORT_BUCKET with the bucket name where you want to collect YARN metrics (for example, my.report.bucket)
aws emr add-steps \
--cluster-id j-XXXXXXXXXXXXX \
--steps Type=CUSTOM_JAR,Name="Install YARN reporter",Jar=s3://REGION.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[s3://<MY-BUCKET>/emr-install_reporter.sh,s3://<MY-BUCKET>/emr_usage_reporter.py,MY_REPORT_BUCKET]

You can now run some Spark jobs on your EMR cluster to start generating application usage metrics.

Launching the CloudFormation stack

When the prerequisites are met and you have the scripts deployed so that your EMR clusters are sending YARN metrics to an S3 bucket, the rest of the solution can be deployed using CloudFormation.

Before launching the stack, upload a copy of this QuickSight definition file into an S3 bucket required by the CloudFormation template to build the initial analysis in QuickSight. When ready, proceed to launch your stack to provision the remaining resources of the solution.

  1. Choose

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed and make sure you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

CloudFormationStack

The following table describes the parameters.

Parameter Description
Stack name A meaningful name for the stack; for example, EMRUsageReport
S3 configuration
YARNS3BucketName Name of S3 bucket where YARN metrics are stored
Cost Usage Report configuration
CURDatabaseName Name of Cost Usage Report database in AWS Glue
CURTableName Name of Cost Usage Report table in AWS Glue
AWS Glue Database configuration
EMRUsageDBName Name of AWS Glue database to be created for the EMR Cost Usage Report
EMRInfraTableName Name of AWS Glue table to be created for infrastructure usage metrics
EMRAppTableName Name of AWS Glue table to be created for application usage metrics
QuickSight configuration
QSUserName Name of QuickSight user in default namespace to manage the EMR Usage Report resources in QuickSight.
QSDefinitionsFile S3 URI of the definition JSON file for the EMR Usage Report.
  1. Enter the parameter values from the preceding table.
  2. Choose Next.
  3. On the next screen, enter any necessary tags, an AWS Identity and Access Management (IAM) role, stack failure, or advanced options if necessary. Otherwise, you can leave them as default.
  4. Choose Next.
  5. Review the details on the final screen and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names or require CAPABILITY_AUTO_EXPAND.
    CloudFormationCheckbox
  6. Choose Create.

The stack will take a couple of minutes to create the remaining resources for the solution. After the CloudFormation stack is created, on the Outputs tab, you can find the details of the resources created.

Reviewing the correlation results

The CloudFormation template creates two Athena views containing the correlated cost breakdown details of the YARN cluster and application metrics with the CUR. The CUR aggregates cost hourly and therefore correlation to derive the cost of running an application is prorated based on the hourly running cost of the EMR cluster.

The following screenshot shows the Athena view for the correlated cost breakdown details of YARN cluster metrics.

CorrelationResults

The following table describes the fields in the Athena view for YARN cluster metrics.

Field Type Description
cluster_id string ID of the cluster.
family string Resource type of the cluster. Possible values are compute instance, elastic map reduce instance, storage and data transfer.
billing_start timestamp Start billing hour of the resource.
usage_type string A specific type or unit of the resource such as BoxUsage:m5.xlarge of compute instance.
cost string Cost associated with the resource.

The following screenshot shows the Athena view for the correlated cost breakdown details of YARN application metrics.

CostBreakdownYARNAppMetrics

The following table describes the fields in the Athena view for YARN application metrics.

Field Type Description
cluster_id string ID of the cluster
id string Unique identifier of the application run
user string User name
name string Name of the application
queue string Queue name from YARN resource manager
finalstatus string Final status of application
applicationtype string Type of the application
startedtime timestamp Start time of the application
finishedtime timestamp End time of the application
elapsed_sec double Time taken to run the application
memoryseconds bigint The memory (in MB) allocated to an application times the number of seconds the application ran
vcoreseconds int The number of YARN vcores allocated to an application times the number of seconds application ran
total_memory_mb_avg double Total amount of memory (in MB) available to the cluster in the hour
memory_sec_cost double Derived unit cost of memoryseconds
application_cost double Derived cost associated with the application based on memoryseconds
total_cost double Total cost of resources associated with the cluster for the hour

Building your own visualization

In QuickSight, the CloudFormation template creates two datasets that reference Athena views as data sources and a sample analysis. The sample analysis has two sheets, EMR Infra Spend and EMR App Spend. They have a prepopulated bar chart and pivot tables to demonstrate how you can use the datasets to build your own visualization to present the cost breakdown details of your EMR clusters.

EMR Infra Spend sheet references to the YARN cluster metrics dataset. There is a filter for date range selection and a filter for cluster ID selection. The sample bar chart shows the consolidated cost breakdown of the resources for each cluster during the period. The pivot table breaks them down further to show their daily expenditure.

The following screenshot shows the EMR Infra Spend sheet from sample analysis created by the CloudFormation template.

EMR App Spend sheet references to the YARN application metrics. There is a filter for date range selection and a filter for cluster ID selection. The pivot table in this sheet shows how you can use the fields in the dataset to present the cost breakdown details of the cluster by users to observe the applications that were run, whether they were completed successfully or not, the time and duration of each run, and the derived cost of the run.

The following screenshot shows the EMR App Spend sheet from sample analysis created by the CloudFormation template.

Cleanup

If you no longer need the resources you created during this walkthrough, delete them to prevent incurring additional charges. To clean up your resources, complete the following steps:

  1. On the CloudFormation console, delete the stack that you created using the template
  2. Terminate the EMR cluster
  3. Empty or delete the S3 bucket used for YARN metrics

Conclusion

In this post, we discussed how to implement a comprehensive cluster usage reporting solution that provides granular visibility into the resource consumption and associated costs of individual applications running on your Amazon EMR on EC2 cluster. By using the power of Athena and QuickSight to correlate YARN metrics with cost usage details from your Cost and Usage Report, this solution empowers organizations to make informed decisions. With these insights, you can optimize resource allocation, implement fair and transparent billing models based on actual application usage, and ultimately achieve greater cost-efficiency in your EMR environments. This solution will help you unlock the full potential of your EMR cluster, driving continuous improvement in your data processing and analytics workflows while maximizing return on investment.


About the authors

Boon Lee Eu is a Senior Technical Account Manager at Amazon Web Services (AWS). He works closely and proactively with Enterprise Support customers to provide advocacy and strategic technical guidance to help plan and achieve operational excellence in AWS environment based on best practices. Based in Singapore, Boon Lee has over 20 years of experience in IT & Telecom industries.

Kyara Labrador is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services (AWS) Philippines, specializing in big data and analytics. She helps customers in designing and implementing scalable, secure, and cost-effective data solutions, as well as migrating and modernizing their big data and analytics workloads to AWS. She is passionate about empowering organizations to unlock the full potential of their data.

Vikas Omer is the Head of Data & AI Solution Architecture for ASEAN at Amazon Web Services (AWS). With over 15 years of experience in the data and AI space, he is a seasoned leader who leverages his expertise to drive innovation and expansion in the region. Vikas is passionate about helping customers and partners succeed in their digital transformation journeys, focusing on cloud-based solutions and emerging technologies.

Lorenzo Ripani is a Big Data Solution Architect at AWS. He is passionate about distributed systems, open source technologies and security. He spends most of his time working with customers around the world to design, evaluate and optimize scalable and secure data pipelines with Amazon EMR.

Apache HBase online migration to Amazon EMR

Post Syndicated from Dalei Xu original https://aws.amazon.com/blogs/big-data/apache-hbase-online-migration-to-amazon-emr/

Apache HBase is an open source, non-relational distributed database developed as part of the Apache Software Foundation’s Hadoop project. HBase can run on Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (Amazon S3), and can host very large tables with billions of rows and millions of columns.

The followings are some typical use cases for HBase:

  • In an ecommerce scenario, when retrieving detailed product information based on the product ID, HBase can provide a quick and random query function.
  • In security assessment and fraud detection cases, the evaluation dimensions for users vary. HBase’s non-relational architectural design and ability to freely scale columns help cater to the complex needs.
  • In a high-frequency, real-time trading platform, HBase can support highly concurrent reads and writes, resulting in higher productivity and business agility.

Recommended HBase deployment mode

Starting with Amazon EMR 5.2.0, you have the option to run Apache HBase on Amazon S3.

Running HBase on Amazon S3 has several added benefits, including lower costs, data durability, and easier scalability. And during HBase migration, you can export the snapshot files to S3 and use them for recovery.

Recommended HBase migration mode

For existing HBase clusters (including self-built based on open source HBase or provided by vendors or other cloud service providers), we recommend using HBase snapshot and replication technologies to migrate to Apache HBase on Amazon EMR without significant downtime of service.

This blog post introduces a set of typical HBase migration solutions with best practices based on real-world customers’ migration case studies. Additionally, we deep dive into some key challenges faced during migrations, such as:

  • Using HBase snapshots to implement initial migration and HBase replication for real-time data migration.
  • HBase provided by other cloud platforms doesn’t support snapshots.
  • A single table with large amounts of data, for example more than 50 TB.
  • Using BucketCache to improve read performance after migration.

HBase snapshots allow you to take a snapshot of a table without too much impact on region servers, snapshots, clones, and restore operations don’t involve data copying. Also, exporting a snapshot to another cluster has little impact on the region servers.

HBase replication is a way to copy data between HBase clusters. It allows you to keep one cluster’s state synchronized with that of another cluster, using the write-ahead log (WAL) of the source cluster to propagate changes. It can work as a disaster recovery solution and also provides higher availability in the architecture.

Prerequisites

To implement HBase migration, you must have the following prerequisites:

Solution summary

In this example, we walk through a typical migration solution, which is from the source HBase on HDFS cluster (Cluster A) to the target Amazon EMR HBase on S3 (Cluster B). The following diagram illustrates the solution architecture.

Solution architecture

To demonstrate the best practice recommended to the HBase migration process, the following are the detailed steps we will walk through, as shown in the preceding diagram.

Step Activity Description Estimated time
1

Configure cluster A

(Source HBase)

Modify the configuration of the source HBase cluster to prepare for subsequent snapshot exports Less than 5 minutes
2

Create cluster B

(Amazon EMR HBase on S3)

Create an EMR cluster with HBase on Amazon S3 as the migration target cluster Less than 10 minutes
3 Configure replication Configure replication from the source HBase cluster to Amazon EMR HBase, but do not start Less than 1 minute
4 Pause service Pause the service of the source HBase cluster Less than 1 minute
5 Create snapshot Create a snapshot for each table on the source HBase cluster Less than 5 minutes
6 Resume service Resume the service of the source HBase cluster Less than 1 minute
7 Snapshot export and restore Use snapshot to migrate data from the source HBase cluster to the Amazon EMR HBase cluster Depends on the size of the table data volume
8 Start replication Start the replication of the source HBase cluster to Amazon EMR HBase and synchronize incremental data Depends on the size of the data accumulated during the snapshot export restore.
9 Test and verify Test the and verify the Amazon EMR HBase

Solution walkthrough

In the preceding diagram and table, we have listed the operational steps of the solution. Next, we will elaborate the specific operations for each step shown in the preceding table.

1. Configure cluster A (source HBase)

When exporting a snapshot from the source HBase cluster to the Amazon EMR HBase cluster, you must modify the following settings on the source cluster to ensure the performance and stability of data transmission.

Configuration classification Configuration item Suggested value Comment
core-site fs.s3.awsAccessKeyId Your AWS access key ID The snapshot export takes a relatively long time. Without an access key and secret key, the snapshot export to Amazon S3 will encounter errors such as com.amazon.aws.emr.hadoop.fs.shaded.com. amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain.
core-site fs.s3.awsSecretAccessKey Your AWS secret access key
yarn-site yarn.nodemanager.resource.memory-mb Half of a single core node RAM The amount of physical memory, in MB, that can be allocated for containers.
yarn-site yarn.scheduler.maximum-allocation-mb Half of a single core node RAM The maximum allocation for every container request at the ResourceManager in MB. Because snapshot export runs in the YARN Map Reduce task, it’s necessary to allocate sufficient memory to YARN to ensure transmission speed.

These values are set depending on the cluster resource, workload, and table data volume. The modification can be done using a web UI if available or by suing a standard configuration XML file. Restart the HBase service after the change is complete.

2. Create cluster B (EMR HBase on S3)

Use the following recommend settings to launch an EMR cluster:

Configuration classification Configuration item Suggested value Comment
yarn-site yarn.nodemanager.resource.memory-mb 20% of a single core node RAM Amount of physical memory, in MB, that can be allocated for containers.
yarn-site yarn.scheduler.maximum-allocation-mb 20% of a single core node RAM The maximum allocation for every container request at the ResourceManager in MB. Because snapshot restore runs in the HBase, it’s necessary to allocate a small amount of small memory to YARN and leave sufficient memory to HBase to ensure restore.
hbase-env.export HBASE_MASTER_OPTS 70% of a single core node RAM Set the Java heap size for the primary HBase.
hbase-env.export HBASE_REGIONSERVER_OPTS 70% of a single core node RAM Set the Java heap size for the HBase region server.
hbase hbase.emr.storageMode S3 Indicates that HBase uses S3 to store data.
hbase-site hbase.rootdir <Your-HBase-Folder-on-S3> Your HBase data folder on S3.

See Configure HBase for more details. Additionally, the default YARN configuration on Amazon EMR for each Amazon EC2 instance type can be found in Task configuration.

The configuration of our example is as shown in the following figure.

Instance group configurations

3. Config replication

Next, we configure the replication peer from the source HBase to the EMR cluster.

The operations include:

  • Create a peer.
  • Because the snapshot migration hasn’t been done, we start by disabling the peer.
  • Specify the table that requires replication for the peer.
  • Enable table replication.

Let’s use the table usertable as an example. The shell script is as follows:

MASTER_IP="<Master-IP>"
TABLE_NAME="usertable"
cat << EOF | sudo -u hbase hbase shell 2>/dev/null
add_peer 'PEER_$TABLE_NAME', CLUSTER_KEY => '$MASTER_IP:2181:/hbase'
disable_peer 'PEER_$TABLE_NAME'
enable_table_replication '$TABLE_NAME'
EOF

The result will like the following text.

hbase:001:0> add_peer 'PEER_usertable', CLUSTER_KEY => '<Master-IP>:2181:/hbase'
Took 13.4117 seconds 
hbase:002:0> disable_peer 'PEER_usertable'
Took 8.1317 seconds 
hbase:003:0> enable_table_replication 'usertable'
The replication of table 'usertable' successfully enabled
Took 168.7254 seconds

In this experiment, we’re using the table usertable as an example. If we have many tables that need to be configured for replication, we can use the following code:

MASTER_IP="<Master-IP>"

# Get all tables
TABLE_LIST=$(echo 'list' | sudo -u hbase hbase shell 2>/dev/null | sed -e '1,/TABLE/d' -e '/seconds/,$d' -e '/row/,$d')
# Iterate each table
for TABLE_NAME in $TABLE_LIST; do
# Add the operation
cat << EOF | sudo -u hbase hbase shell 2>/dev/null
add_peer 'PEER_$TABLE_NAME', CLUSTER_KEY => '$MASTER_IP:2181:/hbase'
disable_peer 'PEER_$TABLE_NAME'
enable_table_replication '$TABLE_NAME'
EOF
done

In the scripts of following steps, if you need to apply the operations for all tables, you can refer to the preceding code sample.

At this point, the status of the peer is Disabled, so replication won’t be started. And the data that needs to be synchronized from the source to the target EMR cluster will be backlogged at the source HBase cluster and won’t be synchronized to HBase on the EMR cluster.

After the snapshot restore (step 7) is completed on the HBase on Amazon EMR cluster, we can enable the peer to start synchronizing data.

If the source HBase version is 1.x, you must run the set_peer_tableCFs function. See HBase Cluster Replication.

4. Pause the service

To pause the service of the source HBase cluster, disable the HBase tables. You can use the following script:

sudo -u hbase bash /usr/lib/hbase/bin/disable_all_tables.sh 2>/dev/null

The result is shown in the following figure.

Disable all tables

After disabling all tables, observe the HBase UI to ensure that no background tasks are being run, and then stop any services accessing the source HBase. This will take 5-10 minutes.

The HBase UI is as shown in the following figure.

Check background tasks

5. Create a snapshot

Make sure the tables in the source HBase are disabled. Then, you can create a snapshot of the source. This process will take 1-5 minutes.

Let’s use the table usertable as an example. The shell script is as follows:

DATE=`date +"%Y%m%d"`
TABLE_NAME="usertable"
sudo -u hbase hbase snapshot create -n "${TABLE_NAME/:/_}-$DATE" -t ${TABLE_NAME} 2>/dev/null

You can check the snapshot with a script:

sudo -u hbase hbase snapshot info -list-snapshots 2>/dev/null

And the result is as shown in the following figure.

Create snapshot

6. Resume service

After the snapshot is successfully created on the source HBase, you can enable the tables and resume the services that access the source HBase. These operations take several minutes, so the total data unavailability time on the source HBase during the implementation (steps 3 to step 6) will be approximately 10 minutes.

The command to enable the table is as follows:

TABLE_NAME="usertable"
echo -e "enable '$TABLE_NAME'" | sudo -u hbase hbase shell 2>/dev/null

The result is shown in the following figure.

Enable table

At this point, you can write data to the source HBase, because the status of the replication peer is disabled, so the incremental data won’t be synchronized to the target cluster.

7. Snapshot export and restore

After the snapshot created in the source HBase, it’s time to export the snapshot to the HBase data directory on the target EMR cluster. The example script is as follows:

DATE=`date +"%Y%m%d"`
TABLE_NAME="usertable"
TARGET_BUCKET="<Your-HBase-Folder-on-S3>"
nohup sudo -u hbase hbase snapshot export -snapshot ${TABLE_NAME/:/_}-$DATE -copy-to $TARGET_BUCKET &> ${TABLE_NAME/:/_}-$DATE-export.log &

Exporting the snapshot will take from 10 minute to several hours to complete, depending on the amount of data to be exported. So we run it in the background. You can check the progress by using the yarn application -list command, as shown in the following figure.

Exporting snapshot process

As an example, if you’re using an HBase cluster with 20 r6g.4xlarge core nodes, it will take about 3 hours for 50 TB of data to be exported to Amazon S3 in same AWS Region.

After the snapshot export is completed at the source HBase, you can check the snapshot in the target EMR cluster using the following script:

sudo -u hbase hbase snapshot info -list-snapshots 2>/dev/null

The result is shown in the following figure.

Check snapshot

Confirm the snapshot name—for example, usertable-20240710 and run snapshot restore on the target EMR cluster using the following script.

TABLE_NAME="usertable"
SNAPSHOT_NAME="usertable-20240710"
cat << EOF | nohup sudo -u hbase hbase shell &> restore-snapshot.out &
disable '$TABLE_NAME'
restore_snapshot '$SNAPSHOT_NAME'
enable '$TABLE_NAME'
EOF

The snapshot restore will take from 10 minute to several hours to complete, depending on the amount of data to be restored, so we run it in the background. The result is as shown in the following figure.

Restore snapshot

You can check the progress of the restore through The Amazon EMR web interface for HBase, as shown in the following figure.

Check snapshot restore

From the Amazon EMR web interface for HBase, you can found it takes about 2 hours to run Clone Snapshot for a sample table with 50 TB of data, and then 1 additional hour to run . After these two stages, the snapshot restore is completed.

8. Start replication

After the snapshot restore is completed on the EMR cluster and the status of the table is set to enabled, you can enable HBase replication in the source HBase. The incremental data will be synchronized to the target EMR cluster.

In the source HBase, the example script is as follows:

TABLE_NAME="usertable"
echo -e "enable_peer 'PEER_$TABLE_NAME'" | sudo -u hbase hbase shell 2>/dev/null

The result is as shown in the following figure.

Enable peer

Wait for the incremental data to be synchronized from the source HBase to the HBase on EMR cluster. The time taken depends on the amount of data accumulated in the source HBase during the snapshot export and restore. In our example, it took about 10 minutes to complete the data synchronization.

You can check the replication status with scripts:

echo -e "status 'replication'" | sudo -u hbase hbase shell 2>/dev/null

The result is shown in the following figure.

Replication status

9. Test and verify

After incremental data synchronization is complete, you can start testing and verifying the results. You can use the same HBase API to access both the source and the target HBase clusters and compare the results.

To guarantee the data integrity, you can check the number of HBase table region and store files for the replicated tables from the Amazon EMR web interface for HBase, as shown in the following figure.

Check hbase region and store files

For small tables, we recommend using the HBase command to verify the number of records. After signing in to the primary node of the Amazon EMR using SSH, you can run the following command:

sudo -u hbase hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'usertable'

Then, in the hbase.log file of the HBase log directory, find the number of records for the table usertable.

For large tables, you can use the HBase Java API to validate the row count in a range of row keys.

We provided sample Java code to implement this functionality. For example, we imported the demo data to usertable using the following script:

java -classpath hbase-utils-1.0-SNAPSHOT-jar-with-dependencies.jar HBaseAccess <Your-Zookeeper-IP> put 1000 20

The result is shown in the following figure.

Put demo data into HBase table

You can run the script multiple times to import enough demo data into the table, then you can use the following script to count the number of records where the value of Row Key is between user1000 and user5000, and the value of the column family:field0 is value0

java -classpath hbase-utils-1.0-SNAPSHOT-jar-with-dependencies.jar HBaseRowCounter <Your-Zookeeper-IP> usertable "user1000" "user1100" "family:field0" "value0"

The result is shown in the following figure.

HBase table row counter

You can run the same code on both the source HBase and the target Amazon EMR HBase to verify that the results are consistent. See complete code.

After these steps are complete, you can switch from the source HBase to the target Amazon EMR Hbase, completing the migration.

Clean up

After you’re done with the solution walkthrough, complete the following steps to clean up your resources:

  1. Stop the Amazon EMR on EC2 cluster.
  2. Delete the S3 bucket that stores the HBase data.
  3. Stop the source HBase cluster, and release its related resource, for example, the Amazon EC2 cluster or resources provided by other vendors or cloud service providers.

Key challenges in HBase migration

In the previous sections, we have detailed the steps to implement HBase online migration through snapshots and replication for general scenario. Many customers’ scenarios may have some differences from the general scenario, and you need to make some modifications to the process steps in order to accomplish the migration.

HBase in the cloud doesn’t support snapshot

Many cloud providers have made modifications to the open source version of HBase, resulting in these versions of HBase not providing snapshot and replication functions. However, these cloud providers will provide data transfer tools for HBase, such as Lindorm Tunnel Service, that can be used to transfer HBase data to an HBase cluster with data on HDFS.

To deploy HBase on Amazon S3, you should follow the previous migration process as the best practice, using snapshot and replication techniques to migrate to an Amazon EMR environment. To solve the problem of HBase versions that don’t support snapshots and replication, you can create an HBase on HDFS as a jump or relay cluster, which can be used to synchronize the data from a source HBase to an HDFS-based HBase cluster, then migrate from the middle cluster to the target HBase on S3.

The following diagram illustrates the solution architecture.

Solution architecture for HBase in the cloud doesn’t support snapshot

You need to add three more steps in addition to the migration steps described previously.

Step Activity Description Estimated time
1

Create Cluster B

(EMR HBase on HDFS)

Create an EMR cluster with HBase on HDFS as the relay cluster. Less than 10 minutes
2 Configure data transfer Configure the data transfer from the outer HBase cluster to Amazon EMR HBase on HDFS and start the data transfer. Less than 5 minutes
3

HBase migration

(snapshot and replication)

Treat the outer HBase cluster as an application which writes data into the Amazon EMR HBase cluster, then you can use the steps in the previous scenario to complete the migration to Amazon EMR HBase on Amazon S3.

Single table with large amounts of data

During the migration process, if the amount of data in a single table in the source HBase (Cluster A) is too large—such as 10 TB or even 50TB—you must modify the target Amazon EMR HBase cluster (Cluster B) configuration to ensure that there are no interruptions during the migration process, especially during the snapshot restore on the Amazon EMR HBase cluster. After the snapshot restore is complete, you can rebuild the Amazon EMR HBase cluster (Cluster C).

The following diagram illustrates the solution architecture for handling a very large table.

Solution architecture for handling a very large table

The following are the steps.

Step Activity Description Estimated time
1 Create Cluster B (EMR HBase on S3 for restore) Create an EMR cluster with the required configuration for a large table snapshot restore. Less than 10 minutes
2

HBase migration

(snapshot and replication)

Consider the Amazon EMR HBase on Amazon S3 as the target cluster, then you can use the steps in the first scenario to complete the migration from the source HBase to the Amazon EMR HBase on S3.
3

Recreate Cluster C

(EMR HBase on S3 for production)

After the migration is complete, Cluster B needs to be changed back to its previous configuration before migration. If it’s inconvenient to modify the parameters, you can use the previous configuration to recreate the EMR cluster (Cluster C). Less than 15 minutes
4 Rebuild replication After recreating the EMR cluster, if replication is still needed to synchronize the data, the replication from the source HBase cluster to the new EMR HBase cluster must be rebuilt. Before building the new EMR cluster, the write service on the source HBase cluster should be paused to avoid data loss on the Amazon EMR HBase. Less than 1 minute

In Step 1, create cluster B (EMR HBase on S3 for restore), use the following configuration for snapshot restore. All time values are in milliseconds.

Configuration classification Configuration item Default value Suggested value Explanation
emrfs-site fs.s3.maxConnections 1000 50000 The number of concurrent Amazon S3 connections that your applications need. The default value is 1000 and must be increased to avoid errors such as com. amazon. ws. emr. hadoop. fs. shaded. com. amazonaws. SdkClientException: Unable to execute HTTP request: timeout waiting for connection from pool.
hbase-site hbase.client.operation.timeout 300000 18000000 Operation timeout is a top-level restriction that makes sure a blocking operation in a table will not be blocked for longer than the timeout.
hbase-site hbase.master.cleaner.interval 60000 18000000 The default is to run HBase Clearer in 60,000 milliseconds, which will clear some files in the archive, resulting in an error that HFile cannot be found.
hbase-site hbase.rpc.timeout 60000 18000000 This property limits how long a single RPC call can run before timing out.
hbase-site hbase.snapshot.master.timeout.millis 300000 18000000 Timeout for the primary HBase for the snapshot procedure.
hbase-site hbase.snapshot.region.timeout 300000 18000000 Timeout for region servers to keep threads in a snapshot request pool waiting.
hbase-site hbase.hregion.majorcompaction 604800000 0

Default is 604,800,000 ms (1 week). Set to 0 to disable automatic triggering of compaction. Note that because of the change to manual triggering, you must make compaction one of the daily operation and maintenance tasks, and run it during periods of low activity to avoid impacting production. The following is the compaction script:

echo -e "major_compact '$TABLE_NAME'" | sudo -u hbase hbase shell

Adjust the suggested values based on the amount of table data, which requires conducting some experiments to determine the values to be used in the final migration plan.

Before you recreate a new EMR cluster in the production environment, disable the HBase table on the active EMR cluster. The command line is as follows:

sudo -u hbase bash /usr/lib/hbase/bin/disable_all_tables.sh 2>/dev/null
echo -e "flush 'hbase:meta'" | sudo -u hbase hbase shell 2>/dev/null

Wait for the command to execute successfully and terminate the current EMR cluster (Cluster B). Now the HBase data is kept in Amazon S3; create a new EMR cluster (Cluster C) with the previous configuration before migration, and specify HBase date folder on S3 to be the same as Cluster B.

Using Bucket Cache to improve read performance

To enhance HBase’s read performance, one of most effective method involves caching data. HBase uses BlockCache to implement caching mechanisms for the region server. Currently, HBase provides two different BlockCache implementations to cache data read from HDFS: the default on-heap LruBlockCache and the BucketCache, which is usually off-heap. Bucket cache is the most used method.

BucketCache can be deployed in offheap, file, or mmaped file mode. These three working modes are the same in terms of memory logical organization and caching process; however, the final storage media corresponding to these three working modes are different, that is, the IOEngine is different.

We recommend that customers use BucketCache in file mode, because the default storage type of Amazon Elastic Block Store (Amazon EBS) in Amazon EMR is SSD. You can put all hot data into BucketCache, which is on Amazon EBS. You can then determine the file size used by BucketCache based on the volume of hot data.

The following are the HBase configurations for BucketCache.

Configuration classification Configuration item Suggested value Explanation
emrfs-site hbase.bucketcache.ioengine files:/mnt1/hbase/cache_01.data Where to store the contents of the bucket cache. You can use offheap, file, files, mmap, or pmem. If a file or files, set it to files. Note that some earlier Amazon EMR versions can only support using one file per core node.
hbase-site hbase.bucketcache.persistent.path file:/mnt1/hbase/cache.meta The path to store the metadata of the bucket cache, used to recover the cache during startup.
hbase-site hbase.bucketcache.size Depends on the hot data volume be cached The capacity in MBs of BucketCache for each core node. If you use multiple cache files, then this size is the sum of the capacities of multiple files.
hbase-site hbase.rs.prefetchblocksonopen TRUE Whether the server should asynchronously load all the blocks when a store file is opened (data, metadata, and index). Note that enabling this property contributes to the time the region server takes to open a region and therefore initialize.
hbase-site hbase.rs.cacheblocksonwrite TRUE Whether an HFile block should be added to the block cache when the block is finished.
hbase-site hbase.rs.cachecompactedblocksonwrite TRUE Whether to cache compressed blocks during writing.

More configuration instructions for BucketCache would refer to Configuration properties.

We provided sample Java code to test HBase read performance. In the Java code, we use the putDemoData method to write test data to the table usertable, ensuring that the data is evenly distributed across HBase table regions, and then use the getDemoData method to read the data.

We tested three scenarios: HBase data stored on HDFS, on Amazon S3 without using BucketCache, and on S3 using BucketCache. To ensure that the written data isn’t cached in the first two scenarios, the cache can be cleared by restarting the the region servers.

We tested on the EMR HBase cluster, which has 10 r6g. 2xlarge core nodes. The command is as follows:

java -classpath hbase-utils-1.0-SNAPSHOT-jar-with-dependencies.jar HBaseAccess <Your-Zookeeper-IP> get 20 100

The result is shown in the following figure.

Read HBase table

Benchmark results and key learning

For the three scenarios, we use 100 HBase record row keys as input, ensure these row keys distributed evenly in HBase table regions, we call the API consecutively with 20 times, 50 times, and 100 times, and we got the time cost result as the following figure. We found the read latency is the shortest when the data is on S3 and using BucketCache.

Read performance

Above, we introduced four migration scenarios. In migration processes in production, we’ve gained invaluable knowledge and experience. We’re sharing the results here for you to use as best practices and a recommended run book.

Configuration parameters

In the configurations provided earlier, some are necessary for BucketCache settings while others mitigate known errors to reduce the snapshot duration. For example, the parameter hbase.snapshot.master.timeout.millis is related to the HBASE-14680 issue. It’s advisable to retain these configurations as much as possible throughout the migration process.

Version choice

When migrating to Amazon EMR and choosing a suitable HBase version, it is recommended to select a more recent minor version and patch version, while keeping the major version unchanged. That is to say:

  • If the source HBase is version 1.x, we recommend using EMR 5.36.1, whose HBase version is 1.4.13, because the HBase 1.x API is compatible and won’t require you to make code changes.
  • If the source HBase is version 2.x, we recommend using EMR 6.15.0, which has an HBase of 2.4.17.

The HBase API under the same major version can be universal. See HBase Version to learn more.

Allocating enough space

HDFS needs to leave enough space when exporting snapshots, it depends on the data volume. Data will be moved to the archive, causing double storage is needed for the table.

Replication

At present, there is compatibility problem if replication and WAL compression are used together. If you are using replication, set the hbase.regionserver.wal.enablecompression property to false. See HBASE-26849 for more information.

By default, replication in HBase is asynchronous, as the write-ahead log (WAL) is sent to another cluster in the background. This implies that when using replication for data recovery, some data loss may occur. Additionally, there is a potential for data overwrite conflicts when concurrently writing the same record within a short time frame. However, HBase v2.x introduces synchronous replication as a solution to this issue. For more details, refer to the Serial Replication documentation.

Disk type for BucketCache

Because BucketCache uses a portion of Amazon EBS IO and throughput to synchronize data, it’s recommended to choose Amazon EBS gp3 volumes for their higher IOPS and throughput.

Response latency when accessing to HBase

Users sometimes face the issue of high response latency from their Hbase on EMR clusters using API calls or the HBase shell tool.

In our testing, we found that the communication between HMaster and RegionServer takes an unusually long time to resolve through DNS. You can reduce latency by adding the host name and IP mapping to the /etc/hosts file in the HBase client host.

Conclusion

In this post, we used the results of real-world migration cases to introduce the process of migrating HBase to Amazon EMR HBase using HBase snapshot and replication and the deployment mode of HBase on Amazon S3. We included how to resolve challenges, such as how to configure the cluster to make the migration process smoother when migrating a single large table, or how to use BucketCache to improve reading performance. We also described techniques for testing performance.

We encourage you to migrate HBase to Amazon EMR HBase. For more information about HBase migration, see Amazon EMR HBase best practices.


About the Authors

Dalei XuDalei Xu is a Analytics Specialist Solution Architect at Amazon Web Services, responsible for consulting, designing, and implementing AWS data analytics solutions. With over 20 years of experience in data-related work, proficient in data development, migration to AWS, architecture design, and performance optimization. Hoping to promote AWS data analytics services to more customers, achieving a win-win situation and mutual growth with customers.

Zhiyong Su is a Migration Specialist Solution Architect at Amazon Web Services, primarily responsible for cloud migration or cross-cloud migration for enterprise-level clients. Has held positions such as R&D Engineer, Solutions Architect, and has years of practical experience in IT professional services and enterprise application architecture.

Shijian TangShijian Tang is a Analytics Specialist Solution Architect at Amazon Web Services.

Enhance Amazon EMR scaling capabilities with Application Master Placement

Post Syndicated from Lorenzo Ripani original https://aws.amazon.com/blogs/big-data/enhance-amazon-emr-scaling-capabilities-with-application-master-placement/

In today’s data-driven world, processing large datasets efficiently is crucial for businesses to gain insights and maintain a competitive edge. Amazon EMR is a managed big data service designed to handle these large-scale data processing needs across the cloud. It allows running applications built using open source frameworks on Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Kubernetes Service (Amazon EKS), or AWS Outposts, or completely serverless. One of the key features of Amazon EMR on EC2 is managed scaling, which dynamically adjusts computing capacity in response to application demands, providing optimal performance and cost-efficiency.

Although managed scaling aims to optimize EMR clusters for best price-performance and elasticity, some use cases require more granular resource allocation. For example, when multiple applications are submitted to the same clusters, resource contention may occur, potentially impacting both performance and cost-efficiency. Additionally, allocating the Application Master (AM) container to non-reliable nodes like Spot can potentially result in loss of the container and immediate shutdown of the entire YARN application, resulting in wasted resources and additional costs for rescheduling the entire YARN application. These uses cases require more granular resource allocation and sophisticated scheduling policies to optimize resource utilization and maintain high performance.

Starting with the Amazon EMR 7.2 release, Amazon EMR on EC2 introduced a new feature called Application Master (AM) label awareness, which allows users to enable YARN node labels to allocate the AM containers within On-Demand nodes only. Because the AM container is responsible for orchestrating the overall job execution, it’s crucial to verify that it gets allocated to a reliable instance and not be subjected to shutdown due to Spot Instance interruption. Additionally, limiting AM containers to On-Demand helps maintain consistent application launch time, because the fulfillment of the On-Demand Instance isn’t prone to unavailable Spot capacity or bid price.

In this post, we explore the key features and use cases where this new functionality can provide significant benefits, enabling cluster administrators to achieve optimal resource utilization, improved application reliability, and cost-efficiency in your EMR on EC2 clusters.

Solution overview

The Application Master label awareness feature in Amazon EMR works in conjunction with YARN node labels, a functionality offered by Hadoop that empowers you to define labels to nodes within a Hadoop cluster. You can use these labels to determine which nodes of the cluster should host specific YARN containers (such as mappers vs. reducers in a MapReduce, or drivers vs. executors in Apache Spark).

This feature is enabled by default when a cluster is launched with Amazon EMR 7.2.0 and later using Amazon EMR managed scaling, and it has been configured to use YARN node labels. The following code is a basic configuration setup that enables this feature:

[
   {
     "Classification": "yarn-site",
     "Properties": {
      "yarn.node-labels.enabled": "true",
       "yarn.node-labels.am.default-node-label-expression": "ON_DEMAND"
     }
   }
]

Within this configuration snippet, we activate the Hadoop node label feature and define a value for the yarn.node-labels.am.default-node-label-expression property. This property defines the YARN node label that will be used to schedule the AM container of each YARN application submitted to the cluster. This specific container plays a key role in maintaining the lifecycle of the workflow, so verifying its placement on reliable nodes in production workloads is crucial, because the unexpected shutdown of this container can result in the shutdown and failure of the entire application.

Currently, the Application Master label awareness feature only supports two predefined node labels that can be specified to allocate the AM container of a YARN job: ON_DEMAND and CORE. When one of these labels is defined using Amazon EMR configurations (see the preceding example code), Amazon EMR automatically creates the corresponding node labels in YARN and labels the instances in the cluster accordingly.

To demonstrate how this feature works, we launch a sample cluster and run some Spark jobs to see how Amazon EMR managed scaling integrates with YARN node labels.

Launch an EMR cluster with Application Manager placement awareness

To perform some tests, you can launch the following AWS CloudFormation stack, which provisions an EMR cluster with managed scaling and the Application Manager placement awareness feature enabled. If this is your first time launching an EMR cluster, make sure to create the Amazon EMR default roles using the following AWS Command Line Interface (AWS CLI) command:

aws emr create-default-roles

To create the cluster, choose Launch Stack:

Launch Cloudformation Stack

Provide the following required parameters:

  • VPC – An existing virtual private cloud (VPC) in your account where the cluster will be provisioned
  • Subnet – The subnet in your VPC where you want to launch the cluster
  • SSH Key Name – An EC2 key pair that you use to connect to the EMR primary node

After the EMR cluster has been provisioned, establish a tunnel to the Hadoop Resource Manager web UI to review the cluster configurations. To access the Resource Manager web UI, complete the following steps:

  1. Set up an SSH tunnel to the primary node using dynamic port forwarding.
  2. Point your browser to the URL http://<primary-node-public-dns>:8088/, using the public DNS name of your cluster’s primary node.

This will open the Hadoop Resource Manager web UI, where you can see how the cluster has been configured.

YARN node labels

In the CloudFormation stack, you launched a cluster specifying to allocate the AM containers on nodes labeled as ON_DEMAND. If you explore the Resource Manager web UI, you can see that Amazon EMR created two labels in the cluster: ON_DEMAND and SPOT. To review the YARN node labels present in your cluster, you can inspect the Node Labels page, as shown in the following screenshot.

On this page, you can see how the YARN labels were created in Amazon EMR:

  • During initial cluster creation, default node labels such as ON_DEMAND and SPOT are automatically generated as non-exclusive partitions
  • The DEFAULT_PARTITION label stays vacant because every node gets labeled based on its market type—either being an On-Demand or Spot Instance

In our example, because we launched a single core node as On-Demand, you can observe a single node assigned to the ON_DEMAND partition, and the SPOT partition remains empty. Because the labels are created as non-exclusive, nodes with these labels can run both containers launched with a specific YARN label and also containers that don’t specify a YARN label. For additional details on YARN node labels, see YARN Node Labels in the Hadoop documentation.

Now that we have discussed how the cluster was configured, we can perform some tests to validate and review the behavior of this feature when using it in combination with managed scaling.

Concurrent application submission with Spot Instances

To test the managed scaling capabilities, we submit a simple SparkPi job configured to utilize all available memory on the single core node initially launched in our cluster:

spark-example \
  --deploy-mode cluster \
  --driver-memory 10g \
  --executor-memory 10g \
  --conf spark.dynamicAllocation.maxExecutors=1 \
  --conf spark.yarn.executor.nodeLabelExpression=SPOT \
  SparkPi 800000

In the preceding snippet, we tuned specific Spark configurations to utilize all the resources of the cluster nodes launched (you could also achieve this using the maximizeResourceAllocation configuration while launching an EMR cluster). Because the cluster has been launched using m5.xlarge instances, we can launch individual containers up to 12 GB in terms of memory requirements. With these assumptions, the snippet configures the following:

  • The Spark driver and executors were configured with 10 GB of memory to utilize most of the available memory on the node, in order to have a single container running on each node of our cluster and simplify this example.
  • The node-labels.am.default-node-label-expression parameter was set to ON_DEMAND, making sure the Spark driver is automatically allocated to the ON_DEMAND partition of our cluster. Because we specified this configuration while launching the cluster, the AM containers are automatically requested to be scheduled on ON_DEMAND labeled instances, so we don’t need to specify it at the job level.
  • The yarn.executor.nodeLabelExpression=SPOT configuration verifies that the executors operated exclusively on TASK nodes using Spot Instances. Removing this configuration allows the Spark executors to be scheduled both on SPOT and ON_DEMAND labeled nodes.
  • The dynamicAllocation.maxExecutors setting was set to 1 to delay the processing time of the application and observe the scaling behavior when multiple YARN applications were submitted concurrently in the same cluster.

As the application transitioned to a RUNNING state, we can verify from the YARN Resource Manager UI that its driver placement was automatically assigned to the ON_DEMAND partition of our cluster (see the following screenshot).

Additionally, upon inspecting the YARN scheduler page, we can see that our SPOT partition doesn’t have a resource associated with it because the cluster was launched with just one On-Demand Instance.

Because the cluster didn’t have Spot Instances initially, you can observe from the Amazon EMR console that managed scaling generates a new Spot task group to accommodate the Spark executor requested to run on Spot nodes only (see the following screenshot) . Before this integration, managed scaling didn’t take into account the YARN labels requested by an application, potentially leading to unpredictable scaling behaviors. With this release, managed scaling now considers the YARN labels specified by applications, enabling more predictable and accurate scaling decisions.

While waiting for the launch of the new Spot node, we submitted another SparkPi job with identical specifications. However, because the memory required to allocate the new Spark Driver was 10 GB and such resources were currently unavailable in the ON_DEMAND partition, the application remained in a pending state until resources became available to schedule its container.

Upon detecting the lack of resources to allocate the new Spark driver, Amazon EMR managed scaling commenced scaling the core instance group (On-Demand Instances in our cluster) by launching a new core node. After the new core node was launched, YARN promptly allocated the pending container on the new node, enabling the application to start its processing. Subsequently, the application requested additional Spot nodes to allocate its own executors (see the following screenshot).

This example demonstrates how managed scaling and YARN labels work together to improve the resiliency of YARN applications, while using cost-effective job executions over Spot Instances.

When to use Application Manager placement awareness and managed scaling

You can use this placement awareness feature to improve cost-efficiency by using Spot Instances while protecting the Application Manager from being incorrectly shut down due to Spot interruptions. It’s particularly useful when you want to take advantage of the cost savings offered by Spot Instances while preserving the stability and reliability of your jobs running on the cluster. When working with managed scaling and the placement awareness feature, consider the following best practices:

  • Maximum cost-efficiency for non-critical jobs – If you have jobs that don’t have strict service level agreement (SLA) requirements, you can force all Spark executors to run on Spot Instances for maximum cost savings. This can be achieved by setting the following Spark configuration:
    spark.yarn.executor.nodeLabelExpression=SPOT

  • Resilient execution for production jobs – For production jobs where you require a more resilient execution, you might consider not setting the yarn.executor.nodeLabelExpression parameter. When no label is specified, executors are dynamically allocated between both On-Demand and Spot nodes, providing a more reliable execution.
  • Limit dynamic allocation for concurrent applications – When working with managed scaling and clusters with multiple applications running concurrently (for example, an interactive cluster with concurrent user utilization), you should consider setting a maximum limit for Spark dynamic allocation using the dynamicAllocation.maxExecutors setting. This can help manage resources over-provisioning and facilitate predictable scaling behavior across applications running on the same cluster. For more details, see Dynamic Allocation in the Spark documentation.
  • Managed scaling configurations – Make sure your managed scaling configurations are set up correctly to facilitate efficient scaling of Spot Instances based on your workload requirements. For example, set an appropriate value for Maximum On-Demand instances in managed scaling based on the number of concurrent applications you want to run on the cluster. Additionally, if you’re planning to use your On-Demand Instances for running solely AM containers, we recommend setting scheduler.capacity.maximum-am-resource-percent to 1 using the Amazon EMR capacity-scheduler classification.
  • Improve startup time of the nodes – If your cluster is subject to frequent scaling events (for example, you have a long-running cluster that can run multiple concurrent EMR steps), you might want to optimize the startup time of your cluster nodes. When trying to get an efficient node startup, consider only installing the minimum required set of application frameworks in the cluster and, whenever possible, avoid installing non-YARN frameworks such as HBase or Trino, which might delay the startup of processing nodes dynamically attached by Amazon EMR managed scaling. Finally, whenever possible, don’t use complex and time-consuming EMR bootstrap actions to avoid increasing the startup time of nodes launched with managed scaling.

By following these best practices, you can take advantage of the cost savings of Spot Instances while maintaining the stability and reliability of your applications, particularly in scenarios where multiple applications are running concurrently on the same cluster.

Conclusion

In this post, we explored the benefits of the new integration between Amazon EMR managed scaling and YARN node labels, reviewed its implementation and usage, and defined a few best practices that can help you get started. Whether you’re running batch processing jobs, stream processing applications, or other YARN workloads on Amazon EMR, this feature can help you achieve substantial cost savings without compromising on performance or reliability.

As you embark on your journey to use Spot Instances in your EMR clusters, remember to follow the best practices outlined in this post, such as setting appropriate configurations for dynamic allocation, node label expressions, and managed scaling policies. By doing so, you can make sure that your applications run efficiently, reliably, and at the lowest possible cost.


About the authors

Lorenzo Ripani is a Big Data Solution Architect at AWS. He is passionate about distributed systems, open source technologies and security. He spends most of his time working with customers around the world to design, evaluate and optimize scalable and secure data pipelines with Amazon EMR.

Miranda Diaz is a Software Development Engineer for EMR at AWS. Miranda works to design and develop technologies that make it easy for customers across the world to automatically scale their computing resources to their needs, helping them achieve the best performance at the optimal cost.

Sajjan Bhattarai is a Senior Cloud Support Engineer at AWS, and specializes in BigData and Machine Learning workloads. He enjoys helping customers around the world to troubleshoot and optimize their data platforms.

Bezuayehu Wate is an Associate Big Data Specialist Solutions Architect at AWS. She works with customers to provide strategic and architectural guidance on designing, building, and modernizing their cloud-based analytics solutions using AWS.

Amazon EMR on EC2 cost optimization: How a global financial services provider reduced costs by 30%

Post Syndicated from Omar Gonzalez original https://aws.amazon.com/blogs/big-data/amazon-emr-on-ec2-cost-optimization-how-a-global-financial-services-provider-reduced-costs-by-30/

In this post, we highlight key lessons learned while helping a global financial services provider migrate their Apache Hadoop clusters to AWS and best practices that helped reduce their Amazon EMR, Amazon Elastic Compute Cloud (Amazon EC2), and Amazon Simple Storage Service (Amazon S3) costs by over 30% per month.

We outline cost-optimization strategies and operational best practices achieved through a strong collaboration with their DevOps teams. We also discuss a data-driven approach using a hackathon focused on cost optimization along with Apache Spark and Apache HBase configuration optimization.

Background

In early 2022, a business unit of a global financial services provider began their journey to migrate their customer solutions to AWS. This included web applications, Apache HBase data stores, Apache Solr search clusters, and Apache Hadoop clusters. The migration included over 150 server nodes and 1 PB of data. The on-premises clusters supported real-time data ingestion and batch processing.

Because of aggressive migration timelines driven by the closure of data centers, they implemented a lift-and-shift rehosting strategy of their Apache Hadoop clusters to Amazon EMR on EC2, as highlighted in the Amazon EMR migration guide.

Amazon EMR on EC2 provided the flexibility for the business unit to run their applications with minimal changes on managed Hadoop clusters with the required Spark, Hive, and HBase software and versions installed. Because the clusters are managed, they were able to decompose their large on-premises cluster and deploy purpose-built transient and persistent clusters for each use case on AWS without increasing operational overhead.

Challenge

Although the lift-and-shift strategy allowed the business unit to migrate with lower risk and allowed their engineering teams to focus on product development, this came with increased ongoing AWS costs.

The business unit deployed transient and persistent clusters for different use cases. Several application components relied on Spark Streaming for real-time analytics, which was deployed on persistent clusters. They also deployed the HBase environment on persistent clusters.

After the initial deployment, they discovered several configuration issues that led to suboptimal performance and increased cost. Despite using Amazon EMR managed scaling for persistent clusters, the configuration wasn’t efficient due to setting a minimum of 40 core nodes and task nodes, resulting in wasted resources. Core nodes were also misconfigured to auto scale. This led to scale-in events shutting down core nodes with shuffle data. The business unit also implemented Amazon EMR auto-termination policies. Because of shuffle data loss on the EMR on EC2 clusters running Spark applications, certain jobs ran five times longer than planned. Here, auto-termination policies didn’t mark a cluster as idle because a job was still running.

Lastly, there were separate environments for development (dev), user acceptance testing (UAT), production (prod), which were also over-provisioned with the minimum capacity units for the managed scaling policies configured too high, leading to higher costs as shown in the following figure.

Short-term cost-optimization strategy

The business unit completed the migration of applications, databases, and Hadoop clusters in 4 months. Their immediate goal was to get out of their data centers as quickly as possible, followed by cost optimization and modernization. Although they expected greater upfront costs because of the lift-and-shift approach, their costs were 40% higher than forecasted. This sped up their need to optimize.

They engaged with their shared services team and the AWS team to develop a cost-optimization strategy. The business unit began by focusing on cost-optimization best practices to implement immediately that didn’t require product development team engagement or impact their productivity. They performed a cost analysis to determine the largest contributors of cost were EMR on EC2 clusters running Spark, EMR on EC2 clusters running HBase, Amazon S3 storage, and EC2 instances running Solr.

The business unit started by enforcing auto-termination of EMR clusters in their dev environments by using automation. They considered using Amazon EMR isIdle Amazon CloudWatch metrics to build an event-driven solution with AWS Lambda, as described in Optimize Amazon EMR costs with idle checks and automatic resource termination using advanced Amazon CloudWatch metrics and AWS Lambda. They implemented a stricter policy to shut down clusters in their lower environments after 3 hours, regardless of usage. They also updated managed scaling policies in DEV and UAT and set the minimum cluster size to three instances to allow clusters to scale up as needed. This resulted in a 60% savings in monthly dev and UAT costs over 5 months, as shown in the following figure.

For the initial production deployment, they had a subset of Spark jobs running on a persistent cluster with an older Amazon EMR 5.(x) release. To optimize costs, they split smaller jobs and larger jobs to run on separate persistent clusters and configured the minimum number of core nodes required to support jobs in each cluster. Setting the core nodes to a constant size while using managed scaling for only task nodes is a recommended best practice and eliminated the issue of shuffle data loss. This also improved the time to scale in and out, because task nodes don’t store data in Hadoop Distributed File System (HDFS).

Solr clusters ran on EC2 instances. To optimize this environment, they ran performance tests to determine the best EC2 instances for their workload.

With over one petabyte of data, Amazon S3 contributed to over 15% of monthly costs. The business unit enabled the Amazon S3 Intelligent-Tiering storage class to optimize storage expenses for historical data and reduce their monthly Amazon S3 costs by over 40%, as shown in the following figure. They also migrated Amazon Elastic Block Store (Amazon EBS) volumes from gp2 to gp3 volume types.

Longer-term cost-optimization strategy

After the business unit realized initial cost savings, they engaged with the AWS team to organize a financial hackathon (FinHack) event. The goal of the hackathon was to reduce costs further by using a data-driven process to test cost-optimization strategies for Spark jobs. To prepare for the hackathon, they identified a set of jobs to test using different Amazon EMR deployment options (Amazon EC2, Amazon EMR Serverless) and configurations (Spot, AWS Graviton, Amazon EMR managed scaling, EC2 instance fleets) to arrive at the most cost-optimized solution for each job. A sample test plan for a job is shown in the following table. The AWS team also assisted with analyzing Spark configurations and job execution during the event.

Job Test Description Configuration
Job 1 1 Run an EMR on EC2 job with default Spark configurations Non Graviton, On-Demand Instances
2 Run an EMR on Serverless job with default Spark configurations Default configuration
3 Run an EMR on EC2 job with default Spark configuration and Graviton instances Graviton, On-Demand Instances
4 Run an EMR on EC2 job with default Spark configuration and Graviton instances. Hybrid Spot Instance allocation. Graviton, On-Demand and Spot Instances

The business unit also performed extensive testing using Spot Instances before and during the FinHack. They initially used the Spot Instance advisor and Spot Blueprints to create optimal instance fleet configurations. They automated the process to select the most optimal Availability Zone to run jobs by querying for the Spot placement scores using the get_spot_placement_scores API before launching new jobs.

During the FinHack, they also developed an EMR job tracking script and report to granularly track cost per job and measure ongoing improvements. They used the AWS SDK for Python (Boto3) to list the status of all transient clusters in their account and report on cluster-level configurations and instance hours per job.

As they executed the test plan, they found several additional areas of enhancement:

  • One of the test jobs makes API calls to Solr clusters, which introduced a bottleneck in the design. To prevent Spark jobs from overwhelming the clusters, they fine-tuned executor.cores and spark.dynamicAllocation.maxExecutors properties.
  • Task nodes were over-provisioned with large EBS volumes. They reduced the size to 100 GB for additional cost savings.
  • They updated their instance fleet configuration by setting unit/weights proportional based on instance types selected.
  • During the initial migration, they set the spark.sql.shuffle.paritions configuration too high. The configuration was fine-tuned for their on-premises cluster but not updated to align with their EMR clusters. They optimized the configuration by setting the value to one or two times the number of vCores in the cluster .

Following the FinHack, they enforced a cost allocation tagging strategy for persistent clusters that are deployed using Terraform and transient clusters deployed using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). They also deployed an EMR Observability dashboard using Amazon Managed Service for Prometheus and Amazon Managed Grafana.

Results

The business unit reduced monthly costs by 30% over 3 months. This allowed them to continue migration efforts of remaining on-premises workloads. Most of their 2,000 jobs per month now run on EMR transient clusters. They have also increased AWS Graviton usage to 40% of total usage hours per month and Spot usage to 10% in non-production environments.

Conclusion

Through a data-driven approach involving cost analysis, adherence to AWS best practices, configuration optimization, and extensive testing during a financial hackathon, the global financial services provider successfully reduced their AWS costs by 30% over 3 months. Key strategies included enforcing auto-termination policies, optimizing managed scaling configurations, using Spot Instances, adopting AWS Graviton instances, fine-tuning Spark and HBase configurations, implementing cost allocation tagging, and developing cost tracking dashboards. Their partnership with AWS teams and a focus on implementing short-term and longer-term best practices allowed them to continue their cloud migration efforts while optimizing costs for their big data workloads on Amazon EMR.

For additional cost-optimization best practices, we recommend visiting AWS Open Data Analytics.


About the Authors

Omar Gonzalez is a Senior Solutions Architect at Amazon Web Services in Southern California with more than 20 years of experience in IT. He is passionate about helping customers drive business value through the use of technology. Outside of work, he enjoys hiking and spending quality time with his family.

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit Shukla is the accomplished author of the book Data Wrangling on AWS, showcasing his expertise in the field. He also runs the YouTube channel Cloud and Coffee with Navnit, where he shares insights on cloud technologies and analytics. Connect with him on LinkedIn.

Amazon EMR Serverless observability, Part 1: Monitor Amazon EMR Serverless workers in near real time using Amazon CloudWatch

Post Syndicated from Kashif Khan original https://aws.amazon.com/blogs/big-data/amazon-emr-serverless-observability-part-1-monitor-amazon-emr-serverless-workers-in-near-real-time-using-amazon-cloudwatch/

Amazon EMR Serverless allows you to run open source big data frameworks such as Apache Spark and Apache Hive without managing clusters and servers. With EMR Serverless, you can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements.

We have launched job worker metrics in Amazon CloudWatch for EMR Serverless. This feature allows you to monitor vCPUs, memory, ephemeral storage, and disk I/O allocation and usage metrics at an aggregate worker level for your Spark and Hive jobs.

This post is part of a series about EMR Serverless observability. In this post, we discuss how to use these CloudWatch metrics to monitor EMR Serverless workers in near real time.

CloudWatch metrics for EMR Serverless

At the per-Spark job level, EMR Serverless emits the following new metrics to CloudWatch for both driver and executors. These metrics provide granular insights into job performance, bottlenecks, and resource utilization.

WorkerCpuAllocated The total numbers of vCPU cores allocated for workers in a job run
WorkerCpuUsed The total numbers of vCPU cores utilized by workers in a job run
WorkerMemoryAllocated The total memory in GB allocated for workers in a job run
WorkerMemoryUsed The total memory in GB utilized by workers in a job run
WorkerEphemeralStorageAllocated The number of bytes of ephemeral storage allocated for workers in a job run
WorkerEphemeralStorageUsed The number of bytes of ephemeral storage used by workers in a job run
WorkerStorageReadBytes The number of bytes read from storage by workers in a job run
WorkerStorageWriteBytes The number of bytes written to storage from workers in a job run

The following are the benefits of monitoring your EMR Serverless jobs with CloudWatch:

  • Optimize resource utilization – You can gain insights into resource utilization patterns and optimize your EMR Serverless configurations for better efficiency and cost savings. For example, underutilization of vCPUs or memory can reveal resource wastage, allowing you to optimize worker sizes to achieve potential cost savings.
  • Diagnose common errors – You can identify root causes and mitigation for common errors without log diving. For example, you can monitor the usage of ephemeral storage and mitigate disk bottlenecks by preemptively allocating more storage per worker.
  • Gain near real-time insights – CloudWatch offers near real-time monitoring capabilities, allowing you to track the performance of your EMR Serverless jobs as and when they are running, for quick detection of any anomalies or performance issues.
  • Configure alerts and notifications – CloudWatch enables you to set up alarms using Amazon Simple Notification Service (Amazon SNS) based on predefined thresholds, allowing you to receive notifications through email or text message when specific metrics reach critical levels.
  • Conduct historical analysis – CloudWatch stores historical data, allowing you to analyze trends over time, identify patterns, and make informed decisions for capacity planning and workload optimization.

Solution overview

To further enhance this observability experience, we have created a solution that gathers all these metrics on a single CloudWatch dashboard for an EMR Serverless application. You need to launch one AWS CloudFormation template per EMR Serverless application. You can monitor all the jobs submitted to a single EMR Serverless application using the same CloudWatch dashboard. To learn more about this dashboard and deploy this solution into your own account, refer to the EMR Serverless CloudWatch Dashboard GitHub repository.

In the following sections, we walk you through how you can use this dashboard to perform the following actions:

  • Optimize your resource utilization to save costs without impacting job performance
  • Diagnose failures due to common errors without the need for log diving and resolve those errors optimally

Prerequisites

To run the sample jobs provided in this post, you need to create an EMR Serverless application with default settings using the AWS Management Console or AWS Command Line Interface (AWS CLI), and then launch the CloudFormation template from the GitHub repo with the EMR Serverless application ID provided as the input to the template.

You need to submit all the jobs in this post to the same EMR Serverless application. If you want to monitor a different application, you can deploy this template for your own EMR Serverless application ID.

Optimize resource utilization

When running Spark jobs, you often start with the default configurations. It can be challenging to optimize your workload without any visibility into actual resource utilization. Some of the most common configurations that we’ve seen customers adjust are spark.driver.cores, spark.driver.memory, spark.executor.cores, and spark.executors.memory.

To illustrate how the newly added CloudWatch dashboard worker-level metrics can help you fine-tune your job configurations for better price-performance and enhanced resource utilization, let’s run the following Spark job, which uses the NOAA Integrated Surface Database (ISD) dataset to run some transformations and aggregations.

Use the following command to run this job on EMR Serverless. Provide your Amazon Simple Storage Service (Amazon S3) bucket and EMR Serverless application ID for which you launched the CloudFormation template. Make sure to use the same application ID to submit all the sample jobs in this post. Additionally, provide an AWS Identity and Access Management (IAM) runtime role.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-1 \
 --application-id <APPLICATION_ID> \
 --execution-role-arn <JOB_ROLE_ARN> \
 --job-driver '{
 "sparkSubmit": {
 "entryPoint": "s3://<BUCKETNAME>/scripts/windycity.py",
 "entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-1/"]
 } }'

Now let’s check the executor vCPUs and memory from the CloudWatch dashboard.

This job was submitted with default EMR Serverless Spark configurations. From the Executor CPU Allocated metric in the preceding screenshot, the job was allocated 396 vCPUs in total (99 executors * 4 vCPUs per executor). However, the job only used a maximum of 110 vCPUs based on Executor CPU Used. This indicates oversubscription of vCPU resources. Similarly, the job was allocated 1,584 GB memory in total based on Executor Memory Allocated. However, from the Executor Memory Used metric, we see that the job only used 176 GB of memory during the job, indicating memory oversubscription.

Now let’s rerun this job with the following adjusted configurations.

Original Job (Default Configuration) Rerun Job (Adjusted Configuration)
spark.executor.memory 14 GB 3 GB
spark.executor.cores 4 2
spark.dynamicAllocation.maxExecutors 99 30
Total Resource Utilization

6.521 vCPU-hours

26.084 memoryGB-hours

32.606 storageGB-hours

1.739 vCPU-hours

3.688 memoryGB-hours

17.394 storageGB-hours

Billable Resource Utilization

7.046 vCPU-hours

28.182 memoryGB-hours

0 storageGB-hours

1.739 vCPU-hours

3.688 memoryGB-hours

0 storageGB-hours

We use the following code:

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-2 \
 --application-id <APPLICATION_ID> \
 --execution-role-arn <JOB_ROLE_ARN> \
 --job-driver '{
 "sparkSubmit": {
 "entryPoint": "s3://<BUCKETNAME>/scripts/windycity.py",
 "entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-2/"],
 "sparkSubmitParameters": "--conf spark.driver.cores=2 --conf spark.driver.memory=3g --conf spark.executor.memory=3g --conf spark.executor.cores=2 --conf spark.dynamicAllocation.maxExecutors=30"
 } }'

Let’s check the executor metrics from the CloudWatch dashboard again for this job run.

In the second job, we see lower allocation of both vCPUs (396 vs. 60) and memory (1,584 GB vs. 120 GB) as expected, resulting in better utilization of resources. The original job ran for 4 minutes, 41 seconds. The second job took 4 minutes, 54 seconds. This reconfiguration has resulted in 79% lower cost savings without affecting the job performance.

You can use these metrics to further optimize your job by increasing or decreasing the number of workers or the allocated resources.

Diagnose and resolve job failures

Using the CloudWatch dashboard, you can diagnose job failures due to issues related to CPU, memory, and storage such as out of memory or no space left on the device. This enables you to identify and resolve common errors quickly without having to check the logs or navigate through Spark History Server. Additionally, because you can check the resource utilization from the dashboard, you can fine-tune the configurations by increasing the required resources only as much as needed instead of oversubscribing to the resources, which further saves costs.

Driver errors

To illustrate this use case, let’s run the following Spark job, which creates a large Spark data frame with a few million rows. Typically, this operation is done by the Spark driver. While submitting the job, we also configure spark.rpc.message.maxSize, because it’s required for task serialization of data frames with a large number of columns.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-3 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.rpc.message.maxSize=2000"
} }'

After a few minutes, the job failed with the error message “Encountered errors when releasing containers,” as seen in the Job details section.

When encountering non-descriptive error messages, it becomes crucial to investigate further by examining the driver and executor logs to troubleshoot further. But before further log diving, let’s first check the CloudWatch dashboard, specifically the driver metrics, because releasing containers is generally performed by the driver.

We can see that the Driver CPU Used and Driver Storage Used are well within their respective allocated values. However, upon checking Driver Memory Allocated and Driver Memory Used, we can see that the driver was using all of the 16 GB memory allocated to it. By default, EMR Serverless drivers are assigned 16 GB memory.

Let’s rerun the job with more driver memory allocated. Let’s set driver memory to 27 GB as the starting point, because spark.driver.memory + spark.driver.memoryOverhead should be less than 30 GB for the default worker type. park.rpc.messsage.maxSize will be unchanged.

aws emr-serverless start-job-run \
—name emrs-cw-dashboard-test-4 \
—application-id <APPLICATION_ID> \
—execution-role-arn <JOB_ROLE_ARN> \
—job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.driver.memory=27G --conf spark.rpc.message.maxSize=2000"
} }'

The job succeeded this time around. Let’s check the CloudWatch dashboard to observe driver memory utilization.

As we can see, the allocated memory is now 30 GB, but the actual driver memory utilization didn’t exceed 21 GB during the job run. Therefore, we can further optimize costs here by reducing the value of spark.driver.memory. We reran the same job with spark.driver.memory set to 22 GB, and the job still succeeded with better driver memory utilization.

Executor errors

Using CloudWatch for observability is ideal for diagnosing driver-related issues because there is only one driver per job and driver resources used is the actual resource usage of the single driver. On the other hand, executor metrics are aggregated across all the workers. However, you can use this dashboard to provide only an adequate amount of resources to make your job succeed, thereby avoiding oversubscription of resources.

To illustrate, let’s run the following Spark job, which simulates uniform disk over-utilization across all workers by processing very large NOAA datasets from several years. This job also transiently caches a very large data frame on disk.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-5 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/noaa-disk.py"
} }'

After a few minutes, we can see that the job failed with “No space left on device” error in the Job details section, which indicates that some of the workers have run out of disk space.

Checking the Running Executors metric from the dashboard, we can identify that there were 99 executor workers running. Each worker comes with 20 GB storage by default.

Because this is a Spark task failure, let’s check the Executor Storage Allocated and Executor Storage Used metrics from the dashboard (because the driver won’t run any tasks).

As we can see, the 99 executors have used up a total of 1,940 GB from the total allocated executor storage of 2,126 GB. This includes both the data shuffled by the executors and the storage used for caching the data frame. We don’t see the full 2,126 GB being utilized from this graph because there might be a few executors out of the 99 executors that weren’t holding much data when the job failed (before these executors could start processing tasks and store the data frame chunks).

Let’s rerun the same job but with increased executor disk size using the parameter spark.emr-serverless.executor.disk. Let’s try with 40 GB disk per executor as a starting point.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-6 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/noaa-disk.py"
"sparkSubmitParameters": "--conf spark.emr-serverless.executor.disk=40G"
}
}'

This time, the job ran successfully. Let’s check the Executor Storage Allocated and Executor Storage Used metrics.

Executor Storage Allocated is now 4,251 GB because we’ve doubled the value of spark.emr-serverless.executor.disk. Although there is now twice as much aggregated executors’ storage, the job still used only a maximum of 1,940 GB out of 4,251 GB. This indicates that our executors were likely running out of disk space only by a few GBs. Therefore, we can try to set spark.emr-serverless.executor.disk to an even lower value like 25 GB or 30 GB instead of 40 GB to save storage costs as we did in the previous scenario. In addition, you can monitor Executor Storage Read Bytes and Executor Storage Write Bytes to see if your job is I/O intensive. In this case, you can use the Shuffle-optimized disks feature of EMR Serverless to further enhance your job’s I/O performance.

The dashboard is also useful to capture information about transient storage used while caching or persisting the data frames, including spill-to-disk scenarios. The Storage tab of Spark History Server records any caching activities, as seen in the following screenshot. However, this data will be lost from Spark History Server after the cache is evicted or when the job finishes. Therefore, Executor Storage Used can be used to do an analysis of a failed job run due to transient storage issues.

In this particular example, the data was evenly distributed among the executors. However, if you have a data skew (for, example only 1–2 executors out of 99 process the most amount of data, and as a result, your job runs out of disk space), the CloudWatch dashboard won’t accurately capture this scenario because the storage data is aggregated across all the executors for a job. For diagnosing issues at the individual executor level, we need to track per-executor-level metrics. We explore more advanced examples of how per-worker-level metrics can help you identify, mitigate, and resolve hard-to-find issues through EMR Serverless integration with Amazon Managed Service for Prometheus.

Conclusion

In this post, you learned how to effectively manage and optimize your EMR Serverless application using a single CloudWatch dashboard with enhanced EMR Serverless metrics. These metrics are available in all AWS Regions where EMR Serverless is available. For more details about this feature, refer to Job-level monitoring.


About the Authors

Kashif Khan is a Sr. Analytics Specialist Solutions Architect at AWS, specializing in big data services like Amazon EMR, AWS Lake Formation, AWS Glue, Amazon Athena, and Amazon DataZone. With over a decade of experience in the big data domain, he possesses extensive expertise in architecting scalable and robust solutions. His role involves providing architectural guidance and collaborating closely with customers to design tailored solutions using AWS analytics services to unlock the full potential of their data.

Veena Vasudevan is a Principal Partner Solutions Architect and Data & AI specialist at AWS. She helps customers and partners build highly optimized, scalable, and secure solutions; modernize their architectures; and migrate their big data, analytics, and AI/ML workloads to AWS.

Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments

Post Syndicated from Umair Nawaz original https://aws.amazon.com/blogs/big-data/use-batch-processing-gateway-to-automate-job-management-in-multi-cluster-amazon-emr-on-eks-environments/

AWS customers often process petabytes of data using Amazon EMR on EKS. In enterprise environments with diverse workloads or varying operational requirements, customers frequently choose a multi-cluster setup due to the following advantages:

  • Better resiliency and no single point of failure – If one cluster fails, other clusters can continue processing critical workloads, maintaining business continuity
  • Better security and isolation – Increased isolation between jobs enhances security and simplifies compliance
  • Better scalability – Distributing workloads across clusters enables horizontal scaling to handle peak demands
  • Performance benefits – Minimizing Kubernetes scheduling delays and network bandwidth contention improves job runtimes
  • Increased flexibility – You can enjoy straightforward experimentation and cost optimization through workload segregation to multiple clusters

However, one of the disadvantages of a multi-cluster setup is that there is no straightforward method to distribute workloads and support effective load balancing across multiple clusters. This post proposes a solution to this challenge by introducing the Batch Processing Gateway (BPG), a centralized gateway that automates job management and routing in multi-cluster environments.

Challenges with multi-cluster environments

In a multi-cluster environment, Spark jobs on Amazon EMR on EKS need to be submitted to different clusters from various clients. This architecture introduces several key challenges:

  • Endpoint management – Clients must maintain and update connections for each target cluster
  • Operational overhead – Managing multiple client connections individually increases the complexity and operational burden
  • Workload distribution – There is no built-in mechanism for job routing across multiple clusters, which impacts configuration, resource allocation, cost transparency, and resilience
  • Resilience and high availability – Without load balancing, the environment lacks fault tolerance and high availability

BPG addresses these challenges by providing a single point of submission for Spark jobs. BPG automates job routing to the appropriate EMR on EKS clusters, providing effective load balancing, simplified endpoint management, and improved resilience. The proposed solution is particularly beneficial for customers with multi-cluster Amazon EMR on EKS setups using the Spark Kubernetes Operator with or without Yunikorn scheduler.

However, although BPG offers significant benefits, it is currently designed to work only with Spark Kubernetes Operator. Additionally, BPG has not been tested with the Volcano scheduler, and the solution is not applicable in environments using native Amazon EMR on EKS APIs.

Solution overview

Martin Fowler describes a gateway as an object that encapsulates access to an external system or resource. In this case, the resource is the EMR on EKS clusters running Spark. A gateway acts as a single point to confront this resource. Any code or connection interacts with the interface of the gateway only. The gateway then translates the incoming API request into the API offered by the resource.

BPG is a gateway specifically designed to provide a seamless interface to Spark on Kubernetes. It’s a REST API service to abstract the underlying Spark on EKS clusters details from users. It runs in its own EKS cluster communicating to Kubernetes API servers of different EKS clusters. Spark users submit an application to BPG through clients, then BPG routes the application to one of the underlying EKS clusters.

The process for submitting Spark jobs using BPG for Amazon EMR on EKS is as follows:

  1. The user submits a job to BPG using a client.
  2. BPG parses the request, translates it into a custom resource definition (CRD), and submits the CRD to an EMR on EKS cluster according to predefined rules.
  3. The Spark Kubernetes Operator interprets the job specification and initiates the job on the cluster.
  4. The Kubernetes scheduler schedules and manages the run of the jobs.

The following figure illustrates the high-level details of BPG. You can read more about BPG in the GitHub README.

Image showing the high-level details of Batch Processing Gateway

The proposed solution involves implementing BPG for multiple underlying EMR on EKS clusters, which effectively resolves the drawbacks discussed earlier. The following diagram illustrates the details of the solution.

Image showing the end to end architecture of of Batch Processing Gateway

Source Code

You can find the code base in the AWS Samples and Batch Processing Gateway GitHub repository.

In the following sections, we walk through the steps to implement the solution.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Clone the repositories to your local machine

We assume that all repositories are cloned into the home directory (~/). All relative paths provided are based on this assumption. If you have cloned the repositories to a different location, adjust the paths accordingly.

  1. Clone the BPG on EMR on EKS GitHub repo with the following command:
cd ~/
git clone [email protected]:aws-samples/batch-processing-gateway-on-emr-on-eks.git

The BPG repository is currently under active development. To provide a stable deployment experience consistent with the provided instructions, we have pinned the repository to the stable commit hash aa3e5c8be973bee54ac700ada963667e5913c865.

Before cloning the repository, verify any security updates and adhere to your organization’s security practices.

  1. Clone the BPG GitHub repo with the following command:
git clone [email protected]:apple/batch-processing-gateway.git
cd batch-processing-gateway
git checkout aa3e5c8be973bee54ac700ada963667e5913c865

Create two EMR on EKS clusters

The creation of EMR on EKS clusters is not the primary focus of this post. For comprehensive instructions, refer to Running Spark jobs with the Spark operator. However, for your convenience, we have included the steps for setting up the EMR on EKS virtual clusters named spark-cluster-a-v and spark-cluster-b-v in the GitHub repo. Follow these steps to create the clusters.

After successfully completing the steps, you should have two EMR on EKS virtual clusters named spark-cluster-a-v and spark-cluster-b-v running on the EKS clusters spark-cluster-a and spark-cluster-b, respectively.

To verify the successful creation of the clusters, open the Amazon EMR console and choose Virtual clusters under EMR on EKS in the navigation pane.

Image showing the Amazon EMR on EKS setup

Set up BPG on Amazon EKS

To set up BPG on Amazon EKS, complete the following steps:

  1. Change to the appropriate directory:
cd ~/batch-processing-gateway-on-emr-on-eks/bpg/
  1. Set up the AWS Region:
export AWS_REGION="<AWS_REGION>"
  1. Create a key pair. Make sure you follow your organization’s best practices for key pair management.
aws ec2 create-key-pair \
--region "$AWS_REGION" \
--key-name ekskp \
--key-type ed25519 \
--key-format pem \
--query "KeyMaterial" \
--output text > ekskp.pem
chmod 400 ekskp.pem
ssh-keygen -y -f ekskp.pem > eks_publickey.pem
chmod 400 eks_publickey.pem

Now you’re ready to create the EKS cluster.

By default, eksctl creates an EKS cluster in dedicated virtual private clouds (VPCs). To avoid reaching the default soft limit on the number of VPCs in an account, we use the --vpc-public-subnets parameter to create clusters in an existing VPC. For this post, we use the default VPC for deploying the solution. Modify the following code to deploy the solution in the appropriate VPC in accordance with your organization’s best practices. For official guidance, refer to Create a VPC.

  1. Get the public subnets for your VPC:
export DEFAULT_FOR_AZ_SUBNET=$(aws ec2 describe-subnets --region "$AWS_REGION" --filters "Name=default-for-az,Values=true" --query "Subnets[?AvailabilityZone != 'us-east-1e'].SubnetId" | jq -r '. | map(tostring) | join(",")')
  1. Create the cluster:
eksctl create cluster \
--name bpg-cluster \
--region "$AWS_REGION" \
--vpc-public-subnets "$DEFAULT_FOR_AZ_SUBNET" \
--with-oidc \
--ssh-access \
--ssh-public-key eks_publickey.pem \
--instance-types=m5.xlarge \
--managed
  1. On the Amazon EKS console, choose Clusters in the navigation pane and check for the successful provisioning of the bpg-cluster

Image showing the Amazon EKS based BPG cluster setup

In the next steps, we make the following changes to the existing batch-processing-gateway code base:

For your convenience, we have provided the updated files in the batch-processing-gateway-on-emr-on-eks repository. You can copy these files into the batch-processing-gateway repository.

  1. Replace POM xml file:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/pom.xml ~/batch-processing-gateway/pom.xml
  1. Replace DAO java file:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/LogDao.java ~/batch-processing-gateway/src/main/java/com/apple/spark/core/LogDao.java
  1. Replace the Dockerfile:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/Dockerfile ~/batch-processing-gateway/Dockerfile

Now you’re ready to build your Docker image.

  1. Create a private Amazon Elastic Container Registry (Amazon ECR) repository:
aws ecr create-repository --repository-name bpg --region "$AWS_REGION"
  1. Get the AWS account ID:
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

  1. Authenticate Docker to your ECR registry:
aws ecr get-login-password --region "$AWS_REGION" | docker login --username AWS --password-stdin "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com
  1. Build your Docker image:
cd ~/batch-processing-gateway/
docker build \
--platform linux/amd64 \
--build-arg VERSION="1.0.0" \
--build-arg BUILD_TIME=$(date -u +"%Y-%m-%dT%H:%M:%SZ") \
--build-arg GIT_COMMIT=$(git rev-parse HEAD) \
--progress=plain \
--no-cache \
-t bpg:1.0.0 .
  1. Tag your image:
docker tag bpg:1.0.0 "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com/bpg:1.0.0
  1. Push the image to your ECR repository:
docker push "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com/bpg:1.0.0

The ImagePullPolicy in the batch-processing-gateway GitHub repo is set to IfNotPresent. Update the image tag in case you need to update the image.

  1. To verify the successful creation and upload of the Docker image, open the Amazon ECR console, choose Repositories under Private registry in the navigation pane, and locate the bpg repository:

Image showing the Amazon ECR setup

Set up an Amazon Aurora MySQL database

Complete the following steps to set up an Amazon Aurora MySQL-Compatible Edition database:

  1. List all default subnets for the given Availability Zone in a specific format:
DEFAULT_FOR_AZ_SUBNET_RFMT=$(aws ec2 describe-subnets --region "$AWS_REGION" --filters "Name=default-for-az,Values=true" --query "Subnets[*].SubnetId" | jq -c '.')
  1. Create a subnet group. Refer to create-db-subnet-group for more details.
aws rds create-db-subnet-group \
--db-subnet-group-name bpg-rds-subnetgroup \
--db-subnet-group-description "BPG Subnet Group for RDS" \
--subnet-ids "$DEFAULT_FOR_AZ_SUBNET_RFMT" \
--region "$AWS_REGION"
  1. List the default VPC:
export DEFAULT_VPC=$(aws ec2 describe-vpcs --region "$AWS_REGION" --filters "Name=isDefault,Values=true" --query "Vpcs[0].VpcId" --output text)
  1. Create a security group:
aws ec2 create-security-group \
--group-name bpg-rds-securitygroup \
--description "BPG Security Group for RDS" \
--vpc-id "$DEFAULT_VPC" \
--region "$AWS_REGION"
  1. List the bpg-rds-securitygroup security group ID:
export BPG_RDS_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)
  1. Create the Aurora DB Regional cluster. Refer to create-db-cluster for more details.
aws rds create-db-cluster \
--database-name bpg \
--db-cluster-identifier bpg \
--engine aurora-mysql \
--engine-version 8.0.mysql_aurora.3.06.1 \
--master-username admin \
--manage-master-user-password \
--db-subnet-group-name bpg-rds-subnetgroup \
--vpc-security-group-ids "$BPG_RDS_SG" \
--region "$AWS_REGION"
  1. Create a DB Writer instance in the cluster. Refer to create-db-instance for more details.
aws rds create-db-instance \
--db-instance-identifier bpg \
--db-cluster-identifier bpg \
--db-instance-class db.r5.large \
--engine aurora-mysql \
--region "$AWS_REGION"

  1. To verify the successful creation of the RDS Regional cluster and Writer instance, on the Amazon RDS console, choose Databases in the navigation pane and check for the bpg database.

Image showing the RDS setup

Set up network connectivity

Security groups for EKS clusters are typically associated with the nodes and the control plane (if using managed nodes). In this section, we configure the networking to allow the node security group of the bpg-cluster to communicate with spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster.

  1. Identify the security groups of bpg-cluster, spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster:
# Identify Node Security Group of the bpg-cluster
BPG_CLUSTER_NODEGROUP_SG=$(aws ec2 describe-instances \
--filters Name=tag:eks:cluster-name,Values=bpg-cluster \
--query "Reservations[*].Instances[*].SecurityGroups[?contains(GroupName, 'eks-cluster-sg-bpg-cluster-')].GroupId" \
--region "$AWS_REGION" \
--output text | uniq)

# Identify Cluster security group of spark-cluster-a and spark-cluster-b
SPARK_A_CLUSTER_SG=$(aws eks describe-cluster --name spark-cluster-a --query "cluster.resourcesVpcConfig.clusterSecurityGroupId" --output text)
SPARK_B_CLUSTER_SG=$(aws eks describe-cluster --name spark-cluster-b --query "cluster.resourcesVpcConfig.clusterSecurityGroupId" --output text)

# Identify Cluster security group of bpg Aurora RDS cluster Writer Instance
BPG_RDS_WRITER_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)

  1. Allow the node security group of the bpg-cluster to communicate with spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster:
# spark-cluster-a
aws ec2 authorize-security-group-ingress --group-id "$SPARK_A_CLUSTER_SG" --protocol tcp --port 443 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

# spark-cluster-b
aws ec2 authorize-security-group-ingress --group-id "$SPARK_B_CLUSTER_SG" --protocol tcp --port 443 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

# bpg-rds
aws ec2 authorize-security-group-ingress --group-id "$BPG_RDS_WRITER_SG" --protocol tcp --port 3306 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

Deploy BPG

We deploy BPG for weight-based cluster selection. spark-cluster-a-v and spark-cluster-b-v are configured with a queue named dev and weight=50. We expect statistically equal distribution of jobs between the two clusters. For more information, refer to Weight Based Cluster Selection.

  1. Get the bpg-cluster context:
BPG_CLUSTER_CONTEXT=$(kubectl config view --output=json | jq -r '.contexts[] | select(.name | contains("bpg-cluster")) | .name')
kubectl config use-context "$BPG_CLUSTER_CONTEXT"

  1. Create a Kubernetes namespace for BPG:
kubectl create namespace bpg

The helm chart for BPG requires a values.yaml file. This file includes various key-value pairs for each EMR on EKS clusters, EKS cluster, and Aurora cluster. Manually updating the values.yaml file can be cumbersome. To simplify this process, we’ve automated the creation of the values.yaml file.

  1. Run the following script to generate the values.yaml file:
cd ~/batch-processing-gateway-on-emr-on-eks/bpg
chmod 755 create-bpg-values-yaml.sh
./create-bpg-values-yaml.sh
  1. Use the following code to deploy the helm chart. Make sure the tag value in both values.template.yaml and values.yaml matches the Docker image tag specified earlier.
cp ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml.$(date +'%Y%m%d%H%M%S') \
&& cp ~/batch-processing-gateway-on-emr-on-eks/bpg/values.yaml ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml \
&& cd ~/batch-processing-gateway/helm/batch-processing-gateway/

kubectl config use-context "$BPG_CLUSTER_CONTEXT"

helm install batch-processing-gateway . --values values.yaml -n bpg

  1. Verify the deployment by listing the pods and viewing the pod logs:
kubectl get pods --namespace bpg
kubectl logs <BPG-PODNAME> --namespace bpg
  1. Exec into the BPG pod and verify the health check:
kubectl exec -it <BPG-PODNAME> -n bpg -- bash 
curl -u admin:admin localhost:8080/skatev2/healthcheck/status

We get the following output:

{"status":"OK"}

BPG is successfully deployed on the EKS cluster.

Test the solution

To test the solution, you can submit multiple Spark jobs by running the following sample code multiple times. The code submits the SparkPi Spark job to the BPG, which in turn submits the jobs to the EMR on EKS cluster based on the set parameters.

  1. Set the kubectl context to the bpg cluster:
kubectl config get-contexts | awk 'NR==1 || /bpg-cluster/'
kubectl config use-context "<CONTEXT_NAME>"
  1. Identify the bpg pod name:
kubectl get pods --namespace bpg
  1. Exec into the bpg pod:

kubectl exec -it "<BPG-PODNAME>" -n bpg -- bash

  1. Submit multiple Spark jobs using the curl. Run the below curl command to submit jobs to spark-cluster-a and spark-cluster-b:
curl -u user:pass localhost:8080/skatev2/spark -i -X POST \
-H 'Content-Type: application/json' \
-d '{
"applicationName": "SparkPiDemo",
"queue": "dev",
"sparkVersion": "3.5.0",
"mainApplicationFile": "local:///usr/lib/spark/examples/jars/spark-examples.jar",
"mainClass":"org.apache.spark.examples.SparkPi",
"driver": {
"cores": 1,
"memory": "2g",
"serviceAccount": "emr-containers-sa-spark",
"labels":{
"version": "3.5.0"
}
},
"executor": {
"instances": 1,
"cores": 1,
"memory": "2g",
"labels":{
"version": "3.5.0"
}
}
}'

After each submission, BPG will inform you of the cluster to which the job was submitted. For example:

HTTP/1.1 200 OK
Date: Sat, 10 Aug 2024 16:17:15 GMT
Content-Type: application/json
Content-Length: 67
{"submissionId":"spark-cluster-a-f72a7ddcfde14f4390194d4027c1e1d6"}
{"submissionId":"spark-cluster-a-d1b359190c7646fa9d704122fbf8c580"}
{"submissionId":"spark-cluster-b-7b61d5d512bb4adeb1dd8a9977d605df"}
  1. Verify that the jobs are running in the EMR cluster spark-cluster-a and spark-cluster-b:
kubectl config get-contexts | awk 'NR==1 || /spark-cluster-(a|b)/'
kubectl get pods -n spark-operator --context "<CONTEXT_NAME>"

You can view the Spark Driver logs to find the value of Pi as shown below:

kubectl logs <SPARK-DRIVER-POD-NAME> --namespace spark-operator --context "<CONTEXT_NAME>"

After successful completion of the job, you should be able to see the below message in the logs:

Pi is roughly 3.1452757263786317

We have successfully tested the weight-based routing of Spark jobs across multiple clusters.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the EMR on EKS virtual cluster:
VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --region="$AWS_REGION" --query "virtualClusters[?name=='spark-cluster-a-v' && state=='RUNNING'].id" --output text)
aws emr-containers delete-virtual-cluster --region="$AWS_REGION" --id "$VIRTUAL_CLUSTER_ID"
VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --region="$AWS_REGION" --query "virtualClusters[?name=='spark-cluster-b-v' && state=='RUNNING'].id" --output text)
aws emr-containers delete-virtual-cluster --region="$AWS_REGION" --id "$VIRTUAL_CLUSTER_ID"

  1. Delete the AWS Identity and Access Management (IAM) role:
aws iam delete-role-policy --role-name sparkjobrole --policy-name EMR-Spark-Job-Execution
aws iam delete-role --role-name sparkjobrole

  1. Delete the RDS DB instance and DB cluster:
aws rds delete-db-instance \
--db-instance-identifier bpg \
--skip-final-snapshot

aws rds delete-db-cluster \
--db-cluster-identifier bpg \
--skip-final-snapshot

  1. Delete the bpg-rds-securitygroup security group and bpg-rds-subnetgroup subnet group:
BPG_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)
aws ec2 delete-security-group --group-id "$BPG_SG"
aws rds delete-db-subnet-group --db-subnet-group-name bpg-rds-subnetgroup

  1. Delete the EKS clusters:
eksctl delete cluster --region="$AWS_REGION" --name=bpg-cluster
eksctl delete cluster --region="$AWS_REGION" --name=spark-cluster-a
eksctl delete cluster --region="$AWS_REGION" --name=spark-cluster-b

  1. Delete bpg ECR repository:
aws ecr delete-repository --repository-name bpg --region="$AWS_REGION" --force

  1. Delete the key pairs:
aws ec2 delete-key-pair --key-name ekskp
aws ec2 delete-key-pair --key-name emrkp

Conclusion

In this post, we explored the challenges associated with managing workloads on EMR on EKS cluster and demonstrated the advantages of adopting a multi-cluster deployment pattern. We introduced Batch Processing Gateway (BPG) as a solution to these challenges, showcasing how it simplifies job management, enhances resilience, and improves horizontal scalability in multi-cluster environments. By implementing BPG, we illustrated the practical application of the gateway architecture pattern for submitting Spark jobs on Amazon EMR on EKS. This post provides a comprehensive understanding of the problem, the benefits of the gateway architecture, and the steps to implement BPG effectively.

We encourage you to evaluate your existing Spark on Amazon EMR on EKS implementation and consider adopting this solution. It allows users to submit, examine, and delete Spark applications on Kubernetes with intuitive API calls, without needing to worry about the underlying complexities.

For this post, we focused on the implementation details of the BPG. As a next step, you can explore integrating BPG with clients such as Apache Airflow, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), or Jupyter notebooks. BPG works well with the Apache Yunikorn scheduler. You can also explore integrating BPG to use Yunikorn queues for job submission.


About the Authors

Image of Author: Umair NawazUmair Nawaz is a Senior DevOps Architect at Amazon Web Services. He works on building secure architectures and advises enterprises on agile software delivery. He is motivated to solve problems strategically by utilizing modern technologies.

Image of Author: Ravikiran RaoRavikiran Rao is a Data Architect at Amazon Web Services and is passionate about solving complex data challenges for various customers. Outside of work, he is a theater enthusiast and amateur tennis player.

Image of Author: Sri PotluriSri Potluri is a Cloud Infrastructure Architect at Amazon Web Services. He is passionate about solving complex problems and delivering well-structured solutions for diverse customers. His expertise spans across a range of cloud technologies, ensuring scalable and reliable infrastructure tailored to each project’s unique challenges.

Image of Author: Suvojit DasguptaSuvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.

How CFM built a well-governed and scalable data-engineering platform using Amazon EMR for financial features generation

Post Syndicated from Julien Lafaye original https://aws.amazon.com/blogs/big-data/how-cfm-built-a-well-governed-and-scalable-data-engineering-platform-using-amazon-emr-for-financial-features-generation/

This post is co-written with Julien Lafaye from CFM.

Capital Fund Management (CFM) is an alternative investment management company based in Paris with staff in New York City and London. CFM takes a scientific approach to finance, using quantitative and systematic techniques to develop the best investment strategies. Over the years, CFM has received many awards for their flagship product Stratus, a multi-strategy investment program that delivers decorrelated returns through a diversified investment approach while seeking a risk profile that is less volatile than traditional market indexes. It was first opened to investors in 1995. CFM assets under management are now $13 billion.

A traditional approach to systematic investing involves analysis of historical trends in asset prices to anticipate future price fluctuations and make investment decisions. Over the years, the investment industry has grown in such a way that relying on historical prices alone is not enough to remain competitive: traditional systematic strategies progressively became public and inefficient, while the number of actors grew, making slices of the pie smaller—a phenomenon known as alpha decay. In recent years, driven by the commoditization of data storage and processing solutions, the industry has seen a growing number of systematic investment management firms switch to alternative data sources to drive their investment decisions. Publicly documented examples include the usage of satellite imagery of mall parking lots to estimate trends in consumer behavior and its impact on stock prices. Using social network data has also often been cited as a potential source of data to improve short-term investment decisions. To remain at the forefront of quantitative investing, CFM has put in place a large-scale data acquisition strategy.

As the CFM Data team, we constantly monitor new data sources and vendors to continue to innovate. The speed at which we can trial datasets and determine whether they are useful to our business is a key factor of success. Trials are short projects usually taking up to a several months; the output of a trial is a buy (or not-buy) decision if we detect information in the dataset that can help us in our investment process. Unfortunately, because datasets come in all shapes and sizes, planning our hardware and software requirements several months ahead has been very challenging. Some datasets require large or specific compute capabilities that we can’t afford to buy if the trial is a failure. The AWS pay-as-you-go model and the constant pace of innovation in data processing technologies enable CFM to maintain agility and facilitate a steady cadence of trials and experimentation.

In this post, we share how we built a well-governed and scalable data engineering platform using Amazon EMR for financial features generation.

AWS as a key enabler of CFM’s business strategy

We have identified the following as key enablers of this data strategy:

  • Managed services – AWS managed services reduce the setup cost of complex data technologies, such as Apache Spark.
  • Elasticity – Compute and storage elasticity removes the burden of having to plan and size hardware procurement. This allows us to be more focused on the business and more agile in our data acquisition strategy.
  • Governance – At CFM, our Data teams are split into autonomous teams that can use different technologies based on their requirements and skills. Each team is the sole owner of its AWS account. To share data to our internal consumers, we use AWS Lake Formation with LF-Tags to streamline the process of managing access rights across the organization.

Data integration workflow

A typical data integration process consists of ingestion, analysis, and production phases.

CFM usually negotiates with vendors a download method that is convenient for both parties. We see a lot of possibilities for exchanging data (HTTPS, FPT, SFPT), but we’re seeing a growing number of vendors standardizing around Amazon Simple Storage Service (Amazon S3).

 CFM data scientists then look up the data and build features that can be used in our trading models. The bulk of our data scientists are heavy users of Jupyter Notebook. Jupyter notebooks are interactive computing environments that allow users to create and share documents containing live code, equations, visualizations, and narrative text. They provide a web-based interface where users can write and run code in different programming languages, such as Python, R, or Julia. Notebooks are organized into cells, which can be run independently, facilitating the iterative development and exploration of data analysis and computational workflows.

We invested a lot in polishing our Jupyter stack (see, for example, the open source project Jupytext, which was initiated by a former CFM employee), and we are proud of the level of integration with our ecosystem that we have reached. Although we explored the option of using AWS managed notebooks to streamline the provisioning process, we have decided to continue hosting these components on our on-premises infrastructure for the current timeline. CFM internal users appreciate the existing development environment and switching to an AWS managed environment would imply a change to their habits, and a temporary drop in productivity.

Exploration of small datasets is entirely feasible within this Jupyter environment, but for large datasets, we have identified Spark as the go-to solution. We could have deployed Spark clusters in our data centers, but we have found that Amazon EMR considerably reduces the time to deploy said clusters and provides many interesting features, such as ARM support through AWS Graviton processors, auto scaling capabilities, and the ability to provision transient clusters.

 After a data scientist has written the feature, CFM deploys a script to the production environment that refreshes the feature as new data comes in. These scripts often run in a relatively short amount of time because they only require processing a small increment of data.

Interactive data exploration workflow

CFM’s data scientists’ preferred way of interacting with EMR clusters is through Jupyter notebooks. Having a long history of managing Jupyter notebooks on premises and customizing them, we opted to integrate EMR clusters into our existing stack. The user workflow is as follows:

  1. The user provisions an EMR cluster through the AWS Service Catalog and the AWS Management Console. Users can also use API calls to do this, but usually prefer using the Service Catalog interface. You can choose various instance types that include different combinations of CPU, memory, and storage, giving you the flexibility to choose the appropriate mix of resources for your applications.
  2. The user starts their Jupyter notebook instance and connects to the EMR cluster.
  3. The user interactively works on the data using the notebook.
  4. The user shuts down the cluster through the Service Catalog.

Solution overview

The connection between the notebook and the cluster is achieved by deploying the following open source components:

  • Apache Livy – This service that provides a REST interface to a Spark driver running on an EMR cluster.
  • Sparkmagic – This set of Jupyter magics provides a straightforward way to connect to the cluster and send PySpark code to the cluster through the Livy endpoint.
  • Sagemaker-studio-analytics-extension – This library provides a set of magics to integrate analytics services (such as Amazon EMR) into Jupyter notebooks. It is used to integrate Amazon SageMaker Studio notebooks and EMR clusters (for more details, see Create and manage Amazon EMR Clusters from SageMaker Studio to run interactive Spark and ML workloads – Part 1). Having the requirement to use our own notebooks, we initially didn’t benefit from this integration. To help us, the Amazon EMR service team made this library available on PyPI and guided us in setting it up. We use this library to facilitate the connection between the notebook and the cluster and to forward the user permissions to the clusters through runtime roles. These runtime roles are then used to access the data instead of instance profile roles assigned to the Amazon Elastic Compute Cloud (Amazon EC2) instances that are part of the cluster. This allows more fine-grained access control on our data.

The following diagram illustrates the solution architecture.

Set up Amazon EMR on an EC2 cluster with the GetClusterSessionCredentials API

A runtime role is an AWS Identity and Access Management (IAM) role that you can specify when you submit a job or query to an EMR cluster. The EMR get-cluster-session-credentials API uses a runtime role to authenticate on EMR nodes based on the IAM policies attached runtime role (we document the steps to enable for the Spark terminal; a similar approach can be expanded for Hive and Presto). This option is generally available in all AWS Regions and the recommended release to use is emr-6.9.0 or later.

Connect to Amazon EMR on the EC2 cluster from Jupyter Notebook with the GCSC API

Jupyter Notebook magic commands provide shortcuts and extra functionality to the notebooks in addition to what can be done with your kernel code. We use Jupyter magics to abstract the underlying connection from Jupyter to the EMR cluster; the analytics extension makes the connection through Livy using the GCSC API.

On your Jupyter instance, server, or notebook PySpark kernel, install the following extension, load the magics, and create a connection to the EMR cluster using your runtime role:

pip install sagemaker-studio-analytics-extension
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-XXXXXYYYYY --auth-type Basic_Access --language python --emr-executiojn-role-arn

Production with Amazon EMR Serverless

CFM has implemented an architecture based on dozens of pipelines: data is ingested from data on Amazon S3 and transformed using Amazon EMR Serverless with Spark; resulting datasets are published back to Amazon S3.

Each pipeline runs as a separate EMR Serverless application to avoid resource contention between workloads. Individual IAM roles are assigned to each EMR Serverless application to apply least privilege access.

To control costs, CFM uses EMR Serverless automatic scaling combined with the maximum capacity feature (which defines the maximum total vCPU, memory, and disk capacity that can be consumed collectively by all the jobs running under this application). Finally, CFM uses an AWS Graviton architecture to optimize even more cost and performance (as highlighted in the screenshot below).

After some iterations, the user produces a final script that is put in production. For early deployments, we relied on Amazon EMR on EC2 to run those scripts. Based on user feedback, we iterated and investigated for opportunities to reduce cluster startup times. Cluster startups could take up to 8 minutes for a runtime requiring a fraction of that time, which impacted the user experience. Also, we wanted to reduce the operational overhead of starting and stopping EMR clusters.

Those are the reasons why we switched to EMR Serverless a few months after its initial release. This move was surprisingly straightforward because it didn’t require any tuning and worked instantly. The only drawback we have seen is the requirement to update AWS tools and libraries in our software stacks to incorporate all the EMR features (such as AWS Graviton); on the other hand, it led to reduced startup time, reduced costs, and better workload isolation.

At this stage, CFM data scientists can perform analytics and extract value from raw data. Resulting datasets are then published to our data mesh service across our organization to allow our scientists to work on prediction models. In the context of CFM, this requires a strong governance and security posture to apply fine-grained access control to this data. This data mesh approach allows CFM to have a clear view from an audit standpoint on dataset usage.

Data governance with Lake Formation

A data mesh on AWS is an architectural approach where data is treated as a product and owned by domain teams. Each team uses AWS services like Amazon S3, AWS Glue, AWS Lambda, and Amazon EMR to independently build and manage their data products, while tools like the AWS Glue Data Catalog enable discoverability. This decentralized approach promotes data autonomy, scalability, and collaboration across the organization:

  • Autonomy – At CFM, like at most companies, we have different teams with difference skillsets and different technology needs. Enabling teams to work autonomously was a key parameter in our decision to move to a decentralized model where each domain would live in its own AWS account. Another advantage was improved security, particularly the ability to contain the potential impact area in the event of credential leaks or account compromises. Lake Formation is key in enabling this kind of model because it streamlines the process of managing access rights across accounts. In the absence of Lake Formation, administrators would have to make sure that resource policies and user policies align to grant access to data: this is usually considered complex, error-prone, and hard to debug. Lake Formation makes this process a lot less complicated.
  • Scalability – There are no blockers that prevent other organization units from joining the data mesh structure, and we expect more teams to join the effort of refining and sharing their data assets.
  • Collaboration – Lake Formation provides a sound foundation for making data products discoverable by CFM internal consumers. On top of Lake Formation, we developed our own Data Catalog portal. It provides a user-friendly interface where users can discover datasets, read through the documentation, and download code snippets (see the following screenshot). The interface is tailor-made for our work habits.

Lake Formation documentation is extensive and provides a collection of ways to achieve a data governance pattern that fits every organization requirement. We made the following choices:

  • LF-Tags – We use LF-Tags instead of named resource permissioning. Tags are associated to resources, and personas are given the permission to access all resources with a certain tag. This makes scaling the process of managing rights straightforward. Also, this is an AWS recommended best practice.
  • Centralization – Databases and LF-Tags are managed in a centralized account, which is managed by a single team.
  • Decentralization of permissions management – Data producers are allowed to associate tags to the datasets they are responsible for. Administrators of consumer accounts can grant access to tagged resources.

Conclusions

In this post, we discussed how CFM built a well-governed and scalable data engineering platform for financial features generation.

Lake Formation provides a solid foundation for sharing datasets across accounts. It removes the operational complexity of managing complex cross-account access through IAM and resource policies. For now, we only use it to share assets created by data scientists, but plan to add new domains in the near future.

Lake Formation also seamlessly integrates with other analytics services like AWS Glue and Amazon Athena. The ability to provide a comprehensive and integrated suite of analytics tools to our users is a strong reason for adopting Lake Formation.

Last but not least, EMR Serverless reduced operational risk and complexity. EMR Serverless applications start in less than 60 seconds, whereas starting an EMR cluster on EC2 instances typically takes more than 5 minutes (as of this writing). The accumulation of those earned minutes effectively eliminated any further instances of missed delivery deadlines.

If you’re looking to streamline your data analytics workflow, simplify cross-account data sharing, and reduce operational overhead, consider using Lake Formation and EMR Serverless in your organization. Check out the AWS Big Data Blog and reach out to your AWS team to learn more about how AWS can help you use managed services to drive efficiency and unlock valuable insights from your data!


About the Authors

Julien Lafaye is a director at Capital Fund Management (CFM) where he is leading the implementation of a data platform on AWS. He is also heading a team of data scientists and software engineers in charge of delivering intraday features to feed CFM trading strategies. Before that, he was developing low latency solutions for transforming & disseminating financial market data. He holds a Phd in computer science and graduated from Ecole Polytechnique Paris. During his spare time, he enjoys cycling, running and tinkering with electronic gadgets and computers.

Matthieu Bonville is a Solutions Architect in AWS France working with Financial Services Industry (FSI) customers. He leverages his technical expertise and knowledge of the FSI domain to help customer architect effective technology solutions that address their business challenges.

Joel Farvault is Principal Specialist SA Analytics for AWS with 25 years’ experience working on enterprise architecture, data governance and analytics, mainly in the financial services industry. Joel has led data transformation projects on fraud analytics, claims automation, and Master Data Management. He leverages his experience to advise customers on their data strategy and technology foundations.

Attribute Amazon EMR on EC2 costs to your end-users

Post Syndicated from Raj Patel original https://aws.amazon.com/blogs/big-data/attribute-amazon-emr-on-ec2-costs-to-your-end-users/

Amazon EMR on EC2 is a managed service that makes it straightforward to run big data processing and analytics workloads on AWS. It simplifies the setup and management of popular open source frameworks like Apache Hadoop and Apache Spark, allowing you to focus on extracting insights from large datasets rather than the underlying infrastructure. With Amazon EMR, you can take advantage of the power of these big data tools to process, analyze, and gain valuable business intelligence from vast amounts of data.

Cost optimization is one of the pillars of the Well-Architected Framework. It focuses on avoiding unnecessary costs, selecting the most appropriate resource types, analyzing spend over time, and scaling in and out to meet business needs without overspending. An optimized workload maximizes the use of all available resources, delivers the desired outcome at the most cost-effective price point, and meets your functional needs.

The current Amazon EMR pricing page shows the estimated cost of the cluster. You can also use AWS Cost Explorer to get more detailed information about your costs. These views give you an overall picture of your Amazon EMR costs. However, you may need to attribute costs at the individual Spark job level. For example, you might want to know the usage cost in Amazon EMR for the finance business unit. Or, for chargeback purposes, you might need to aggregate the cost of Spark applications by functional area. After you have allocated costs to individual Spark jobs, this data can help you make informed decisions to optimize your costs. For instance, you could choose to restructure your applications to utilize fewer resources. Alternatively, you might opt to explore different pricing models like Amazon EMR on EKS or Amazon EMR Serverless.

In this post, we share a chargeback model that you can use to track and allocate the costs of Spark workloads running on Amazon EMR on EC2 clusters. We describe an approach that assigns Amazon EMR costs to different jobs, teams, or lines of business. You can use this feature to distribute costs across various business units. This can assist you in monitoring the return on investment for your Spark-based workloads.

Solution overview

The solution is designed to help you track the cost of your Spark applications running on EMR on EC2. It can help you identify cost optimizations and improve the cost-efficiency of your EMR clusters.

The proposed solution uses a scheduled AWS Lambda function that operates on a daily basis. The function captures usage and cost metrics, which are subsequently stored in Amazon Relational Database Service (Amazon RDS) tables. The data stored in the RDS tables is then queried to derive chargeback figures and generate reporting trends using Amazon QuickSight. The utilization of these AWS services incurs additional costs for implementing this solution. Alternatively, you can consider an approach that involves a cron-based agent script installed on your existing EMR cluster, if you want to avoid the use of additional AWS services and associated costs for building your chargeback solution. This script stores the relevant metrics in an Amazon Simple Storage Service (Amazon S3) bucket, and uses Python Jupyter notebooks to generate chargeback numbers based on the data files stored in Amazon S3, using AWS Glue tables.

The following diagram shows the current solution architecture.

Solution Architecture

The workflow consists of the following steps:

  1. A Lambda function gets the following parameters from Parameter Store, a capability of AWS Systems Manager:
    {
      "yarn_url": "http://dummy.compute-1.amazonaws.com:8088/ws/v1/cluster/apps",
      "tbl_applicationlogs_lz": "public.emr_applications_execution_log_lz",
      "tbl_applicationlogs": "public.emr_applications_execution_log",
      "tbl_emrcost": "public.emr_cluster_usage_cost",
      "tbl_emrinstance_usage": "public.emr_cluster_instances_usage",
      "emrcluster_id": "j-xxxxxxxxxx",
      "emrcluster_name": "EMR_Cost_Measure",
      "emrcluster_role": "dt-dna-shared",
      "emrcluster_linkedaccount": "xxxxxxxxxxx",
      "postgres_rds": {
        "host": "xxxxxxxxx.amazonaws.com",
        "dbname": "postgres",
        "user": "postgresadmin",
        "secretid": "postgressecretid"
      }
    }

  2. The Lambda function extracts Spark application run logs from the EMR cluster using the Resource Manager API. The following metrics are extracted as part of the process: vcore-seconds, memory MB-seconds, and storage GB-seconds.
  3. The Lambda function captures the daily cost of EMR clusters from Cost Explorer.
  4. The Lambda function also extracts EMR On-Demand and Spot Instance usage data using the Amazon Elastic Compute Cloud (Amazon EC2) Boto3 APIs.
  5. Lambda function loads these datasets into an RDS database.
  6. The cost of running a Spark application is determined by the amount of CPU resources it uses, compared to the total CPU usage of all Spark applications. This information is used to distribute the overall cost among different teams, business lines, or EMR queues.

The extraction process runs daily, extracting the previous day’s data and storing it in an Amazon RDS for PostgreSQL table. The historical data in the table needs to be purged based on your use case.

The solution is open source and available on GitHub.

You can use the AWS Cloud Development Kit (AWS CDK) to deploy the Lambda function, RDS for PostgreSQL data model tables, and a QuickSight dashboard to track EMR cluster cost at the job, team, or business unit level.

The following schema show the tables used in the solution which are queried by QuickSight to populate the dashboard.

  • emr_applications_execution_log_lz or public.emr_applications_execution_log – Storage for daily run metrics for all jobs run on the EMR cluster:
    • appdatecollect – Log collection date
    • app_id – Spark job run ID
    • app_name – Run name
    • queue – EMR queue in which job was run
    • job_state – Job running state
    • job_status – Job run final status (Succeeded or Failed)
    • starttime – Job start time
    • endtime – Job end time
    • runtime_seconds – Runtime in seconds
    • vcore_seconds – Consumed vCore CPU in seconds
    • memory_seconds – Memory consumed
    • running_containers – Containers used
    • rm_clusterid – EMR cluster ID
  • emr_cluster_usage_cost – Captures Amazon EMR and Amazon EC2 daily cost consumption from Cost Explorer and loads the data into the RDS table:
    • costdatecollect – Cost collection date
    • startdate – Cost start date
    • enddate – Cost end date
    • emr_unique_tag – EMR cluster associated tag
    • net_unblendedcost – Total unblended daily dollar cost
    • unblendedcost – Total unblended daily dollar cost
    • cost_type – Daily cost
    • service_name – AWS service for which the cost incurred (Amazon EMR and Amazon EC2)
    • emr_clusterid – EMR cluster ID
    • emr_clustername – EMR cluster name
    • loadtime – Table load date/time
  • emr_cluster_instances_usage – Captures the aggregated resource usage (vCores) and allocated resources for each EMR cluster node, and helps identify the idle time of the cluster:
    • instancedatecollect – Instance usage collect date
    • emr_instance_day_run_seconds – EMR instance active seconds in the day
    • emr_region – EMR cluster AWS Region
    • emr_clusterid – EMR cluster ID
    • emr_clustername – EMR cluster name
    • emr_cluster_fleet_type – EMR cluster fleet type
    • emr_node_type – Instance node type
    • emr_market – Market type (on-demand or provisioned)
    • emr_instance_type – Instance size
    • emr_ec2_instance_id – Corresponding EC2 instance ID
    • emr_ec2_status – Running status
    • emr_ec2_default_vcpus – Allocated vCPU
    • emr_ec2_memory – EC2 instance memory
    • emr_ec2_creation_datetime – EC2 instance creation date/time
    • emr_ec2_end_datetime – EC2 instance end date/time
    • emr_ec2_ready_datetime – EC2 instance ready date/time
    • loadtime – Table load date/time

Prerequisites

You must have the following prerequisites before implementing the solution:

  • An EMR on EC2 cluster.
  • The EMR cluster must have a unique tag value defined. You can assign the tag directly on the Amazon EMR console or using Tag Editor. The recommended tag key is cost-center along with a unique value for your EMR cluster. After you create and apply user-defined tags, it can take up to 24 hours for the tag keys to appear on your cost allocation tags page for activation
  • Activate the tag in AWS Billing. It takes about 24 hours to activate the tag if not done before. To activate the tag, follow these steps:
    • On the AWS Billing and Cost Management console, choose Cost allocation tags from navigation pane.
    • Select the tag key that you want to activate.
    • Choose Activate.
  • The Spark application’s name should follow the standardized naming convention. It consists of seven components separated by underscores: <business_unit>_<program>_<application>_<source>_<job_name>_<frequency>_<job_type>. These components are used to summarize the resource consumption and cost in the final report. For example: HR_PAYROLL_PS_PSPROD_TAXDUDUCTION_DLY_LD, FIN_CASHRECEIPT_GL_GLDB_MAIN_DLY_LD, or MKT_CAMPAIGN_CRM_CRMDB_TOPRATEDCAMPAIGN_DLY_LD. The application name must be supplied with the spark submit command using the --name parameter with the standardized naming convention. If any of these components don’t have a value, hardcode the values with the following suggested names:
    • frequency
    • job_type
    • Business_unit
  • The Lambda function should be able to connect to Cost Explorer, connect to the EMR cluster through the Resource Manager APIs, and load data into the RDS for PostgreSQL database. To do this, you need to configure the Lambda function as follows:
    • VPC configuration – The Lambda function should be able to access the EMR cluster, Cost Explorer, AWS Secrets Manager, and Parameter Store. If access is not in place already, you can do this by creating a virtual private cloud (VPC) that includes the EMR cluster and create VPC endpoint for Parameter Store and Secrets Manager and attach it to the VPC. Because there is no VPC endpoint available for Cost Explorer and in order to have Lambda connect to Cost Explorer, a private subnet and a route table are required to send VPC traffic to public NAT gateway. If your EMR cluster is in public subnet, you must create a private subnet including a custom route table and a public NAT gateway, which will allow the Cost Explorer connection to flow from the VPC private subnet. Refer to How do I set up a NAT gateway for a private subnet in Amazon VPC? for setup instructions and attach the newly created private subnet to the Lambda function explicitly.
    • IAM role – The Lambda function needs to have an AWS Identity and Access Management (IAM) role with the following permissions: AmazonEC2ReadOnlyAccess, AWSCostExplorerFullAccess, and AmazonRDSDataFullAccess. This role will be created automatically during AWS CDK stack deployment; you don’t need to set it up separately.
  • The AWS CDK should be installed on AWS Cloud9 (preferred) or another development environment such as VSCode or Pycharm. For more information, refer to Prerequisites.
  • The RDS for PostgreSQL database (v10 or higher) credentials should be stored in Secrets Manager. For more information, refer to Storing database credentials in AWS Secrets Manager.

Create RDS tables

Create the data model tables mentioned in emr-cost-rds-tables-ddl.sql by logging in to postgres rds manually into the public schema.

Use DBeaver or any compatible SQL clients to connect to the RDS instance and validate the tables have been created.

Deploy AWS CDK stacks

Complete the steps in this section to deploy the following resources using the AWS CDK:

  • Parameter Store to store required parameter values
  • IAM role for the Lambda function to help connect to Amazon EMR and underlying EC2 instances, Cost Explorer, CloudWatch, and Parameter Store
  • Lambda function
  1. Clone the GitHub repo:
    git clone [email protected]:aws-samples/attribute-amazon-emr-costs-to-your-end-users.git

  2. Update the following the environment parameters in cdk.context.json (this file can be found in the main directory):
    1. yarn_urlYARN ResourceManager URL to read job run logs and metrics. This URL should be accessible within the VPC where Lambda would be deployed.
    2. tbl_applicationlogs_lz – RDS temp table to store EMR application run logs.
    3. tbl_applicationlogs – RDS table to store EMR application run logs.
    4. tbl_emrcost – RDS table to capture daily EMR cluster usage cost.
    5. tbl_emrinstance_usage – RDS table to store EMR cluster instance usage info.
    6. emrcluster_id – EMR cluster instance ID.
    7. emrcluster_name – EMR cluster name.
    8. emrcluster_tag – Tag key assigned to EMR cluster.
    9. emrcluster_tag_value – Unique value for EMR cluster tag.
    10. emrcluster_role – Service role for Amazon EMR (EMR role).
    11. emrcluster_linkedaccount – Account ID under which the EMR cluster is running.
    12. postgres_rds – RDS for PostgreSQL connection details.
    13. vpc_id – VPC ID in which the EMR cluster is configured and the cost metering Lambda function would be deployed.
    14. vpc_subnets – Comma-separated private subnets ID associated with the VPC.
    15. sg_id – EMR security group ID.

The following is a sample cdk.context.json file after being populated with the parameters:

{
  "yarn_url": "http://dummy.compute-1.amazonaws.com:8088/ws/v1/cluster/apps",
  "tbl_applicationlogs_lz": "public.emr_applications_execution_log_lz",
  "tbl_applicationlogs": "public.emr_applications_execution_log",
  "tbl_emrcost": "public.emr_cluster_usage_cost",
  "tbl_emrinstance_usage": "public.emr_cluster_instances_usage",
  "emrcluster_id": "j-xxxxxxxxxx",
  "emrcluster_name": "EMRClusterName",
  "emrcluster_tag": "EMRClusterTag",
  "emrcluster_tag_value": "EMRClusterUniqueTagValue",
  "emrcluster_role": "EMRClusterServiceRole",
  "emrcluster_linkedaccount": "xxxxxxxxxxx",
  "postgres_rds": {
    "host": "xxxxxxxxx.amazonaws.com",
    "dbname": "dbname",
    "user": "username",
    "secretid": "DatabaseUserSecretID"
  },
  "vpc_id": "xxxxxxxxx",
  "vpc_subnets": "subnet-xxxxxxxxxxx",
  "sg_id": "xxxxxxxxxx"
}

You can choose to deploy the AWS CDK stack using AWS Cloud9 or any other development environment according to your needs. For instructions to set up AWS Cloud9, refer to Getting started: basic tutorials for AWS Cloud9.

  1. Go to AWS Cloud9 and choose File and Upload local files upload the project folder.
  2. Deploy the AWS CDK stack with the following code:
    cd attribute-amazon-emr-costs-to-your-end-users/
    pip install -r requirements.txt
    cdk deploy –-all

The deployed Lambda function requires two external libraries: psycopg2 and requests. The corresponding layer needs to be created and assigned to the Lambda function. For instructions to create a Lambda layer for the requests module, refer to Step-by-Step Guide to Creating an AWS Lambda Function Layer.

Creation of the psycopg2 package and layer is tied to the Python runtime version of the Lambda function. Provided that the Lambda function uses the Python 3.9 runtime, complete the following steps to create the corresponding layer package for peycopog2:

  1. Download psycopg2_binary-2.9.9-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl from https://pypi.org/project/psycopg2-binary/#files.
  2. Unzip and move the contents to a directory named python:
    zip ‘python’ directory

  3. Create a Lambda layer for psycopg2 using the zip file.
  4. Assign the layer to the Lambda function by choosing Add a layer in the deployed function properties.
  5. Validate the AWS CDK deployment.

Your Lambda function details should look similar to the following screenshot.

Lambda Function Screenshot

On the Systems Manager console, validate the Parameter Store content for actual values.

The IAM role details should look similar to the following code, which allows the Lambda function access to Amazon EMR and underlying EC2 instances, Cost Explorer, CloudWatch, Secrets Manager, and Parameter Store:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "ce:GetCostAndUsage",
        "ce:ListCostAllocationTags",
        "ec2:AttachNetworkInterface",
        "ec2:CreateNetworkInterface",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeInstanceTypes",
        "ec2:DescribeInstances",
        "ec2:DescribeNetworkInterfaces",
        "elasticmapreduce:Describe*",
        "elasticmapreduce:List*",
        "ssm:Describe*",
        "ssm:Get*",
        "ssm:List*"
      ],
      "Resource": "*",
      "Effect": "Allow"
    },
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:DescribeLogStreams",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*",
      "Effect": "Allow"
    },
    {
      "Action": "secretsmanager:GetSecretValue",
      "Resource": "arn:aws:secretsmanager:*:*:*",
      "Effect": "Allow"
    }
  ]
}

Test the solution

To test the solution, you can run a Spark job that combines multiple files in the EMR cluster, and you can do this by creating separate steps within the cluster. Refer to Optimize Amazon EMR costs for legacy and Spark workloads for more details on how to add the jobs as steps to EMR cluster.

  1. Use the following sample command to submit the Spark job (emr_union_job.py).
    It takes in three arguments:

    1. <input_full_path> – The Amazon S3 location of the data file that is read in by the Spark job. The path should not be changed. The input_full_path is s3://aws-blogs-artifacts-public/artifacts/BDB-2997/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet
    2. <output_path> – The S3 folder where the results are written to.
    3. <number of copies to be unioned> – By changing the input to the Spark job, you can make sure the job runs for different amounts of time and also change the number of Spot nodes used.
spark-submit --deploy-mode cluster --name HR_PAYROLL_PS_PSPROD_TAXDUDUCTION_DLY_LD s3://aws-blogs-artifacts-public/artifacts/BDB-2997/scripts/emr_union_job.py s3://aws-blogs-artifacts-public/artifacts/BDB-2997/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet s3://<output_bucket>/<output_path>/ 6

spark-submit --deploy-mode cluster --name FIN_CASHRECEIPT_GL_GLDB_MAIN_DLY_LD s3://aws-blogs-artifacts-public/artifacts/BDB-2997/scripts/emr_union_job.py s3://aws-blogs-artifacts-public/artifacts/BDB-2997/sample-data/input/part-00000-a0885743-e0cb-48b1-bc2b-05eb748ab898-c000.snappy.parquet s3://<output_bucket>/<output_path>/ 12

The following screenshot shows the log of the steps run on the Amazon EMR console.

EMR Steps Execution

  1. Run the deployed Lambda function from the Lambda console. This loads the daily application log, EMR dollar usage, and EMR instance usage details into their respective RDS tables.

The following screenshot of the Amazon RDS query editor shows the results for public.emr_applications_execution_log.

public.emr_applications_execution_log

The following screenshot shows the results for public.emr_cluster_usage_cost.

public.emr_cluster_usage_cost

The following screenshot shows the results for public.emr_cluster_instances_usage.

public.emr_cluster_instances_usage

Cost can be calculated using the preceding three tables based on your requirements. In the following SQL query, you calculate the cost based on relative usage of all applications in a day. You first identify the total vcore-seconds CPU consumed in a day and then find out the percentage share of an application. This drives the cost based on overall cluster cost in a day.

Consider the following example scenario, where 10 applications ran on the cluster for a given day. You would use the following sequence of steps to calculate the chargeback cost:

  1. Calculate the relative percentage usage of each application (consumed vcore-seconds CPU by app/total vcore-seconds CPU consumed).
  2. Now you have the relative resource consumption of each application, distribute the cluster cost to each application. Let’s assume that the total EMR cluster cost for that date is $400.
app_id app_name runtime_seconds vcore_seconds % Relative Usage Amazon EMR Cost ($)
application_00001 app1 10 120 5% 19.83
application_00002 app2 5 60 2% 9.91
application_00003 app3 4 45 2% 7.43
application_00004 app4 70 840 35% 138.79
application_00005 app5 21 300 12% 49.57
application_00006 app6 4 48 2% 7.93
application_00007 app7 12 150 6% 24.78
application_00008 app8 52 620 26% 102.44
application_00009 app9 12 130 5% 21.48
application_00010 app10 9 108 4% 17.84

A sample chargeback cost calculation SQL query is available on the GitHub repo.

You can use the SQL query to create a report dashboard to plot multiple charts for the insights. The following are two examples created using QuickSight.

The following is a daily bar chart.

Cost Daily Bar Chart

The following shows total dollars consumed.

Cost Pie chart

Solution cost

Let’s assume we’re calculating for an environment that runs 1,000 jobs daily, and we run this solution daily:

  • Lambda costs – One run requires 30 Lambda function invocations per month.
  • Amazon RDS cost – The total number of records in the public.emr_applications_execution_log table for a 30-day month would be 30,000 records, which translates to 5.72 MB of storage. If we consider the other two smaller tables and storage overhead, the overall monthly storage requirement would be approximately 12 MB.

In summary, the solution cost according to the AWS Pricing Calculator is $34.20/year, which is negligible.

Clean up

To avoid ongoing charges for the resources that you created, complete the following steps:

  • Delete the AWS CDK stacks:
    cdk destroy –-all

  • Delete the QuickSight report and dashboard, if created.
  • Run the following SQL to drop the tables:
    drop table public.emr_applications_execution_log_lz;
    drop table public.emr_applications_execution_log;
    drop table public.emr_cluster_usage_cost;
    drop table public.emr_cluster_instances_usage;

Conclusion

With this solution, you can deploy a chargeback model to attribute costs to users and groups using the EMR cluster. You can also identify options for optimization, scaling, and separation of workloads to different clusters based on usage and growth needs.

You can collect the metrics for a longer duration to observe trends on the usage of Amazon EMR resources and use that for forecasting purposes.

If you have any thoughts or questions, leave them in the comments section.


About the Authors

Raj Patel is AWS Lead Consultant for Data Analytics solutions based out of India. He specializes in building and modernising analytical solutions. His background is in data warehouse/data lake – architecture, development and administration. He is in data and analytical field for over 14 years.

Ramesh DPRamesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Gaurav JainGaurav Jain is a Sr Data Architect with AWS Professional Services, specialized in big data and helps customers modernize their data platforms on the cloud. He is passionate about building the right analytics solutions to gain timely insights and make critical business decisions. Outside of work, he loves to spend time with his family and likes watching movies and sports.

Dipal Mahajan is a Lead Consultant with Amazon Web Services based out of India, where he guides global customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings extensive experience on Software Development, Architecture and Analytics from industries like finance, telecom, retail and healthcare.

Amazon EMR 7.1 runtime for Apache Spark and Iceberg can run Spark workloads 2.7 times faster than Apache Spark 3.5.1 and Iceberg 1.5.2

Post Syndicated from Hari Kishore Chaparala original https://aws.amazon.com/blogs/big-data/amazon-emr-7-1-runtime-for-apache-spark-and-iceberg-can-run-spark-workloads-2-7-times-faster-than-apache-spark-3-5-1-and-iceberg-1-5-2/

In this post, we explore the performance benefits of using the Amazon EMR runtime for Apache Spark and Apache Iceberg compared to running the same workloads with open source Spark 3.5.1 on Iceberg tables. Iceberg is a popular open source high-performance format for large analytic tables. Our benchmarks demonstrate that Amazon EMR can run TPC-DS 3 TB workloads 2.7 times faster, reducing the runtime from 1.548 hours to 0.564 hours. Additionally, the cost efficiency improves by 2.2 times, with the total cost decreasing from $16.09 to $7.23 when using Amazon Elastic Compute Cloud (Amazon EC2) On-Demand r5d.4xlarge instances, providing observable gains for data processing tasks.

The Amazon EMR runtime for Apache Spark offers a high-performance runtime environment while maintaining 100% API compatibility with open source Spark and Iceberg table format. In Run Apache Spark 3.5.1 workloads 4.5 times faster with Amazon EMR runtime for Apache Spark, we detailed some of the optimizations, showing a runtime improvement of 4.5 times faster and 2.8 times better price-performance compared to open source Spark 3.5.1 on the TPC-DS 3 TB benchmark. However, many of the optimizations are geared towards DataSource V1, whereas Iceberg uses Spark DataSource V2. Recognizing this, we have focused on migrating some of the existing optimizations in the EMR runtime for Spark to DataSource V2 and introducing Iceberg-specific enhancements. These improvements are built on top of the Spark runtime enhancements on query planning, physical plan operator improvements, and optimizations with Amazon Simple Storage Service (Amazon S3) and the Java runtime. We have added eight new optimizations incrementally since the Amazon EMR 6.15 release in 2023, which are present in Amazon EMR 7.1 and turned on by default. Some of the improvements include the following:

  • Optimizing DataSource V2 in Spark:
    • Dynamic filtering on non-partitioned columns
    • Removing redundant broadcast hash joins
    • Partial hash aggregate pushdowns
    • Bloom filter-based joins
  • Iceberg-specific enhancements:
    • Data prefetch
    • Support for file size-based estimations

Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts all use the optimized runtimes. Refer to Working with Apache Iceberg in Amazon EMR and Best practices for optimizing Apache Iceberg workloads for more details.

Benchmark results for Amazon EMR 7.1 vs. open source Spark 3.5.1 and Iceberg 1.5.2

To assess the Spark engine’s performance with the Iceberg table format, we performed benchmark tests using the 3 TB TPC-DS dataset, version 2.13 (our results derived from the TPC-DS dataset are not directly comparable to the official TPC-DS results due to setup differences). Benchmark tests for the EMR runtime for Spark and Iceberg were conducted on Amazon EMR 7.1 clusters with Spark 3.5.0 and Iceberg 1.4.3-amzn-0 versions, and open source Spark 3.5.1 and Iceberg 1.5.2 was deployed on EC2 clusters designated for open source runs.

The setup instructions and technical details are available in our GitHub repository. To minimize the influence of external catalogs like AWS Glue and Hive, we used the Hadoop catalog for the Iceberg tables. This uses the underlying file system, specifically Amazon S3, as the catalog. We can define this setup by configuring the property spark.sql.catalog.<catalog_name>.type. The fact tables used the default partitioning by the date column, which have a number of partitions varying from 200–2,100. No precalculated statistics were used for these tables.

We ran a total of 104 SparkSQL queries in three sequential rounds, and the average runtime of each query across these rounds was taken for comparison. The average runtime for the three rounds on Amazon EMR 7.1 with Iceberg enabled was 0.56 hours, demonstrating a 2.7-fold speed increase compared to open source Spark 3.5.1 and Iceberg 1.5.2. The following figure presents the total runtimes in seconds.

The following table summarizes the metrics.

Metric Amazon EMR 7.1 on EC2 Open Source Spark 3.5.1 and Iceberg 1.5.2
Average runtime in seconds 2033.17 5575.19
Geometric mean over queries in seconds 10.13153 20.34651
Cost* $7.23 $16.09

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of Amazon EMR 7.1 relative to open source Spark 3.5.1 and Iceberg 1.5.2. The extent of the speedup varies from one query to another, ranging from 9.6 times faster for q93 to 1.04 times faster for q34, with Amazon EMR outperforming the open source Spark with Iceberg tables. The horizontal axis arranges the TPC-DS 3 TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup in seconds.

Cost comparison

Our benchmark provides the total runtime and geometric mean data to assess the performance of Spark and Iceberg in a complex, real-world decision support scenario. For additional insights, we also examine the cost aspect. We calculate cost estimates using formulas that account for EC2 On-Demand instances, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR expenses.

  • Amazon EC2 cost (includes SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The calculations reveal that the Amazon EMR 7.1 benchmark yields a 2.2-fold cost efficiency improvement over open source Spark 3.5.1 and Iceberg 1.5.2 in running the benchmark job.

Metric Amazon EMR 7.1 Open Source Spark 3.5.1 and Iceberg 1.5.2
Runtime in hours 0.564 1.548
Number of EC2 instances 9 9
Amazon EBS Size 20gb 20gb
Amazon EC2 cost $5.85 $16.05
Amazon EBS cost $0.01 $0.04
Amazon EMR cost $1.37 $0
Total cost $7.23 $16.09
Cost savings Amazon EMR 7.1 is 2.2 times better Baseline

In addition to the time-based metrics discussed so far, data from Spark event logs shows that Amazon EMR 7.1 scanned approximately 3.4 times less data from Amazon S3 and 4.1 times fewer records than the open source version in the TPC-DS 3 TB benchmark. This reduction in Amazon S3 data scanning contributes directly to cost savings for Amazon EMR workloads.

Run open source Spark benchmarks on Iceberg tables

We used separate EC2 clusters, each equipped with nine r5d.4xlarge instances, for testing both open source Spark 3.5.1 and Iceberg 1.5.2 and Amazon EMR 7.1. The primary node was equipped with 16 vCPU and 128 GB of memory, and the eight worker nodes together had 128 vCPU and 1024 GB of memory. We conducted tests using the Amazon EMR default settings to showcase the typical user experience and minimally adjusted the settings of Spark and Iceberg to maintain a balanced comparison.

The following table summarizes the Amazon EC2 configurations for the primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance Storage (GB) EBS Root Volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20 GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repo, set up the TPC-DS source data in your S3 bucket and on your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.1.jar to your S3 bucket.
  3. Create Iceberg tables from the TPC-DS source data. Follow the instructions on GitHub to create Iceberg tables using the Hadoop catalog. For example, the following code uses an EMR 7.1 cluster with Iceberg enabled to create the tables:
aws emr add-steps --cluster-id <cluster-id> --steps Type=Spark,Name="Create Iceberg Tables",
Args=[--class,com.amazonaws.eks.tpcds.CreateIcebergTables,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.hadoop_catalog.type=hadoop,
--conf,spark.sql.catalog.hadoop_catalog.warehouse=s3://<bucket>/<warehouse_path>/,
--conf,spark.sql.catalog.hadoop_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<bucket>/<jar_location>/spark-benchmark-assembly-3.5.1.jar,
s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/,
/home/hadoop/tpcds-kit/tools,parquet,3000,true,<database_name>,true,true],ActionOnFailure=CONTINUE 
--region <AWS region>

Note the Hadoop catalog warehouse location and database name from the preceding step. We use the same tables to run benchmarks with Amazon EMR 7.1 and open source Spark and Iceberg.

This benchmark application is built from the branch tpcds-v2.13_iceberg. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repo.

Create and configure a YARN cluster on Amazon EC2

To compare Iceberg performance between Amazon EMR on Amazon EC2 and open source Spark on Amazon EC2, follow the instructions in the emr-spark-benchmark GitHub repo to create an open source Spark cluster on Amazon EC2 using Flintrock with eight worker nodes.

Based on the cluster selection for this test, the following configurations are used:

Run the TPC-DS benchmark with Apache Spark 3.5.1 and Iceberg 1.5.2

Complete the following steps to run the TPC-DS benchmark:

  1. Log in to the open source cluster primary using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables.
    2. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions   \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog    \
--conf spark.sql.catalog.local.type=hadoop  \
--conf spark.sql.catalog.local.warehouse=s3a://<YOUR_S3_BUCKET>/<warehouse_path>/ \
--conf spark.sql.defaultCatalog=local   \
--conf spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO   \
spark-benchmark-assembly-3.5.1.jar   \
s3://<YOUR_S3_BUCKET>/benchmark_run 3000 1 false  \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13    \
true <database> > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

After the Spark job finishes, retrieve the test result file from the output S3 bucket at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. This can be done either through the Amazon S3 console by navigating to the specified bucket location or by using the Amazon Command Line Interface (AWS CLI). The Spark benchmark application organizes the data by creating a timestamp folder and placing a summary file within a folder labeled summary.csv. The output CSV files contain four columns without headers:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

With the data from three separate test runs with one iteration each time, we can calculate the average and geometric mean of the benchmark runtimes.

Run the TPC-DS benchmark with the EMR runtime for Spark

Most of the instructions are similar to Steps to run Spark Benchmarking with a few Iceberg-specific details.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure the AWS CLI shell to point to the benchmarking AWS account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application JAR file to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR on EC2 Cluster and run benchmark job to spin up an EMR on EC2 cluster. Make sure to enable Iceberg. See Create an Iceberg cluster for more details. Choose the correct Amazon EMR version, root volume size, and same resource configuration as the open source Flintrock setup. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. We need this for the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps from the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from Step 2.
    2. The benchmark application is at s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar.
    3. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables. This should be the same as the one used for the open source TPC-DS benchmark run.
    4. The results will be in s3://<your-bucket>/benchmark_run.
aws emr add-steps   --cluster-id <cluster-id>
--steps Type=Spark,Name="SPARK Iceberg EMR TPCDS Benchmark Job",
Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,
--conf,spark.driver.cores=4,
--conf,spark.driver.memory=10g,
--conf,spark.executor.cores=16,
--conf,spark.executor.memory=100g,
--conf,spark.executor.instances=8,
--conf,spark.network.timeout=2000,
--conf,spark.executor.heartbeatInterval=300s,
--conf,spark.dynamicAllocation.enabled=false,
--conf,spark.shuffle.service.enabled=false,
--conf,spark.sql.iceberg.data-prefetch.enabled=true,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.local.type=hadoop,
--conf,spark.sql.catalog.local.warehouse=s3://<your-bucket>/<warehouse-path>,
--conf,spark.sql.defaultCatalog=local,
--conf,spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar,
s3://<your-bucket>/benchmark_run,3000,1,false,
'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,
q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,
q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,
q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,
q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,
q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,
q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,
q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,
q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,
q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,
q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,
q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,
q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,
q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,
q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,
q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,
q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,
q99-v2.13\,ss_max-v2.13',true,<database>],ActionOnFailure=CONTINUE 
--region <aws-region>

Summarize the results

After the step is complete, you can see the summarized benchmark result at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv in the same way as the previous run and compute the average and geometric mean of the query runtimes.

Clean up

To prevent any future charges, delete the resources you created by following the instructions provided in the Cleanup section of the GitHub repository.

Summary

Amazon EMR is consistently enhancing the EMR runtime for Spark when used with Iceberg tables, achieving a performance that is 2.7 times faster than open source Spark 3.5.1 and Iceberg 1.5.2 on TPC-DS 3 TB, v2.13. We encourage you to keep up to date with the latest Amazon EMR releases to fully benefit from ongoing performance improvements.

To stay informed, subscribe to the AWS Big Data Blog’s RSS feed, where you can find updates on the EMR runtime for Spark and Iceberg, as well as tips on configuration best practices and tuning recommendations.


About the authors

Hari Kishore Chaparala is a software development engineer for Amazon EMR at Amazon Web Services.

Udit Mehrotra is an Engineering Manager for EMR at Amazon Web Services.

Migrate data from an on-premises Hadoop environment to Amazon S3 using S3DistCp with AWS Direct Connect

Post Syndicated from Vicky Jacob original https://aws.amazon.com/blogs/big-data/migrate-data-from-an-on-premises-hadoop-environment-to-amazon-s3-using-s3distcp-with-aws-direct-connect/

This post demonstrates how to migrate nearly any amount of data from an on-premises Apache Hadoop environment to Amazon Simple Storage Service (Amazon S3) by using S3DistCp on Amazon EMR with AWS Direct Connect.

To transfer resources from a target EMR cluster, the traditional Hadoop DistCp must be run on the source cluster to move data from one cluster to another, which invokes a MapReduce job on the source cluster and can consume a lot of cluster resources (depending on the data volume). To avoid this problem and minimize the load on the source cluster, you can use S3DistCp with Direct Connect to migrate terabytes of data from an on-premises Hadoop environment to Amazon S3. This process runs the job on the target EMR cluster, minimizing the burden on the source cluster.

This post provides instructions for using S3DistCp for migrating data to the AWS Cloud. Apache DistCp is an open source tool that you can use to copy large amounts of data. S3DistCp is similar to DistCp, but optimized to work with AWS, particularly Amazon S3. When compared to Hadoop DistCp, S3DistCp is more scalable, with higher throughput and efficient for parallel copying of large numbers of objects across s3 buckets and across AWS accounts.

Solution overview

The architecture for this solution includes the following components:

  • Source technology stack:
    • A Hadoop cluster with connectivity to the target EMR cluster over Direct Connect
  • Target technology stack:

The following architecture diagram shows how you can use the S3DistCp from the target EMR cluster to migrate huge volumes of data from an on-premises Hadoop environment through a private network connection, such as Direct Connect to Amazon S3.

S3DistCpThis migration approach uses the following tools to perform the migration:

  • S3DistCp – S3DistCp is similar to DistCp, but optimized to work with AWS, particularly Amazon S3. The command for S3DistCp in Amazon EMR version 4.0 and later is s3-dist-cp, which you add as a step in a cluster or at the command line. With S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into Hadoop Distributed Filed System (HDFS), where it can be processed by subsequent steps in your EMR cluster. You can also use S3DistCp to copy data between S3 buckets or from HDFS to Amazon S3. S3DistCp is more scalable and efficient for parallel copying of large numbers of objects across buckets and across AWS accounts.
  • Amazon S3 – Amazon S3 is an object storage service. You can use Amazon S3 to store and retrieve any amount of data at any time, from anywhere on the web.
  • Amazon VPC – Amazon VPC provisions a logically isolated section of the AWS Cloud where you can launch AWS resources in a virtual network that you’ve defined. This virtual network closely resembles a traditional network that you would operate in your own data center, with the benefits of using the scalable infrastructure of AWS.
  • AWS Identity and Access Management (IAM) – IAM is a web service for securely controlling access to AWS services. With IAM, you can centrally manage users, security credentials such as access keys, and permissions that control which AWS resources users and applications can access.
  • Direct Connect – Direct Connect links your internal network to a Direct Connect location over a standard ethernet fiber-optic cable. One end of the cable is connected to your router, the other to a Direct Connect router. With this connection, you can create virtual interfaces directly to public AWS services (for example, to Amazon S3) or to Amazon VPC, bypassing internet service providers in your network path. A Direct Connect location provides access to AWS in the AWS Region with which it’s associated. You can use a single connection in a public Region or in AWS GovCloud (US) to access public AWS services in all other public Regions.

In the following sections, we discuss the steps to perform the data migration using S3DistCp.

Prerequisites

Before you begin, you should have the following prerequisites:

Get the active NameNode from the source Hadoop cluster

Sign in to any of the nodes on the source cluster and run the following commands on bash to get the active NameNode on the cluster.

In newer versions of Hadoop, run the following command to get the service status, which will list the active NameNode on the cluster:

[hadoop@hadoopcluster01 ~]$ hdfs haadmin -getAllServiceState
hadoopcluster01.test.amazon.local:8020                active
hadoopcluster02.test.amazon.local:8020                standby
hadoopcluster03.test.amazon.local:8020                standby

On older versions of Hadoop, run the following method on bash to get the active NameNode on the cluster:

[hadoop@hadoopcluster01 ~]$ getActiveNameNode(){
    nameservice=$(hdfs getconf -confKey dfs.nameservices);
    ns=$(hdfs getconf -confKey dfs.ha.namenodes.${nameservice});
    IFS=',' read -ra ADDR <<< "$ns"
    activeNode=''
    for n in "${ADDR[@]}"; do
        state=$(hdfs haadmin -getServiceState $n)
        if [ $state = "active" ]; then
            echo "$state ==>$n"
            activeNode=$n
        fi
    done
    activeNodeFQDN=$(hdfs getconf -confKey dfs.namenode.rpc-address.${nameservice}.${activeNode})
    echo $activeNodeFQDN;
}

[hadoop@hadoopcluster01 ~]$ getActiveNameNode
active ==>namenode863
hadoopcluster01.test.amazon.local:8020

Validate connectivity from the EMR cluster to the source Hadoop cluster

As mentioned in the prerequisites, you should have an EMR cluster and attach a custom IAM role for Amazon EMR. Run the following command to validate the connectivity from the target EMR cluster to the source Hadoop cluster:

[hadoop@emrcluster01 ~]$ telnet hadoopcluster01.test.amazon.local 8020
Trying 192.168.0.1...
Connected to hadoopcluster01.test.amazon.local.
Escape character is '^]'.
^]

Alternatively, you can run the following command:

[hadoop@emrcluster01 ~]$ curl -v telnet://hadoopcluster01.test.amazon.local:8020
*   Trying 192.168.0.1:8020...
* Connected to hadoopcluster01.test.amazon.local (192.168.0.1) port 8020 (#0)

Validate if the source HDFS path exists

Check if the source HDFS path is valid. If the following command returns 0, indicating that it’s valid, you can proceed to the next step:

[hadoop@emrcluster01 ~]$ hdfs dfs -test -d hdfs://hadoopcluster01.test.amazon.local/user/hive/warehouse/test.db/test_table01
[hadoop@emrcluster01 ~]$ echo $?
0

Transfer data using S3DistCp

To transfer the source HDFS folder to the target S3 bucket, use the following command:

s3-dist-cp --src hdfs://hadoopcluster01.test.amazon.local/user/hive/warehouse/test.db/test_table01 --dest s3://<BUCKET_NAME>/user/hive/warehouse/test.db/test_table01

To transfer large files in multipart chunks, use the following command to set the chuck size:

s3-dist-cp --src hdfs://hadoopcluster01.test.amazon.local/user/hive/warehouse/test.db/test_table01 --dest s3://<BUCKET_NAME>/user/hive/warehouse/test.db/test_table01 --multipartUploadChunkSize=1024

This will invoke a MapReduce job on the target EMR cluster. Depending on the volume of the data and the bandwidth speed, the job can take a few minutes up to a few hours to complete.

To get the list of running yarn applications on the cluster, run the following command:

yarn application -list

Validate the migrated data

After the preceding MapReduce job completes successfully, use the following steps to validate the data copied over:

source_size=$(hdfs dfs -du -s hdfs://hadoopcluster01.test.amazon.local/user/hive/warehouse/test.db/test_table01 | awk -F' ' '{print $1}')
target_size=$(aws s3 ls --summarize --recursive s3://<BUCKET_NAME>/user/hive/warehouse/test.db/test_table01 | grep "Total Size:" | awk -F' ' '{print $3}')

printf "Source HDFS folder Size in bytes: $source_size"
printf "Target S3 folder Size in bytes: $target_size" 

If the source and target size aren’t equal, perform the cleanup step in the next section and repeat the preceding S3DistCp step.

Clean up partially copied or errored out partitions and files

S3DistCp doesn’t clean up partially copied files and partitions if it fails while copying. Clean up partially copied or errored out partitions and files before you reinitiate the S3DistCp process. To clean up objects on Amazon S3, use the following AWS CLI command to perform the delete operation:

aws s3 rm s3://<BUCKET_NAME>/path/to/the/object –recursive

Best practices

To avoid copy errors when using S3DistCP to copy a single file (instead of a directory) from Amazon S3 to HDFS, use Amazon EMR 5.33.0 or later, or Amazon EMR 6.3.0 or later.

Limitations

The following are limitations of this approach:

  • If S3DistCp is unable to copy some or all of the specified files, the cluster step fails and returns a non-zero error code. If this occurs, S3DistCp doesn’t clean up partially copied files.
  • S3DistCp doesn’t support concatenation for Parquet files. Use PySpark instead. For more information, see Concatenating Parquet files in Amazon EMR.
  • VPC limitations apply to Direct Connect for Amazon S3. For more information, see AWS Direct Connect quotas.

Conclusion

In this post, we demonstrated the power of S3DistCp to migrate huge volumes of data from a source Hadoop cluster to a target S3 bucket or HDFS on an EMR cluster. With S3DistCp, you can migrate terabytes of data without affecting the compute resources on the source cluster as compared to Hadoop DistCp.

For more information about using S3DistCp, see the following resources:


About the Author

Vicky Wilson Jacob is a Senior Data Architect with AWS Professional Services Analytics Practice. Vicky specializes in Big Data, Data Engineering, Machine Learning, Data Science and Generative AI. He is passionate about technology and solving customer challenges. At AWS, he works with companies helping customers implement big data, machine learning, analytics, and generative AI solutions on cloud. Outside of work, he enjoys spending time with family, singing, and playing guitar.

Run Apache Spark 3.5.1 workloads 4.5 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Ashok Chintalapati original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-5-1-workloads-4-5-times-faster-with-amazon-emr-runtime-for-apache-spark/

The Amazon EMR runtime for Apache Spark is a performance-optimized runtime that is 100% API compatible with open source Apache Spark. It offers faster out-of-the-box performance than Apache Spark through improved query plans, faster queries, and tuned defaults. Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts all use this optimized runtime, which is 4.5 times faster than Apache Spark 3.5.1 and has 2.8 times better price-performance based on an industry standard benchmark derived from TPC-DS at 3 TB scale (note that our TPC-DS derived benchmark results are not directly comparable with official TPC-DS benchmark results).

We added 35 optimizations since the EOY 2022 release, EMR 6.9, that are included in both EMR 7.0 and EMR 7.1. These improvements are turned on by default and are 100% API compatible with Apache Spark. Some of the improvements since our previous post, Amazon EMR on EKS widens the performance gap, include:

  • Spark physical plan operator improvements – We continue to improve Spark runtime performance by changing the operator algorithms:
    • Optimized data structures used in hash joins for performance and memory requirements, allowing the use of more performant join algorithm for more cases
    • Optimized sorting for partial window
    • Optimized rollup operations
    • Improved sort algorithm for shuffle partitioning
    • Optimized hash aggregate operator
    • More efficient decimal arithmetic operations
    • Aggregates based on Parquet statistics
  • Spark query planning improvements – We introduced new rules in the Spark’s Catalyst optimizer to improve efficiency:
    • Adaptively minimize redundant joins
    • Adaptively identify and disable unhelpful optimizations at runtime
    • Infer more advanced Bloom filters and dynamic partition pruning filters from complex query plans to reduce amount of data shuffled and read from Amazon Simple Storage Service (Amazon S3)
  • Fewer requests to Amazon S3 – We reduced requests sent to Amazon S3 when reading Parquet files by minimizing unnecessary requests and introducing a cache for Parquet footers.
  • Java 17 as default Java runtime used in Amazon EMR 7.0 – Java 17 was extensively tested and tuned for optimal performance, allowing us to make it the default Java runtime for Amazon EMR 7.0.

For more details on EMR Spark performance optimizations, refer to Optimize Spark performance.

In this post, we share the testing methodology and benchmark results comparing the latest Amazon EMR versions (7.0 and 7.1) with the EOY 2022 release (version 6.9) and Apache Spark 3.5.1 to demonstrate the latest cost improvements Amazon EMR has achieved.

Benchmark results for Amazon EMR 7.1 vs. Apache Spark 3.5.1

To evaluate the Spark engine performance, we ran benchmark tests with the 3 TB TPC-DS dataset. We used EMR Spark clusters for benchmark tests on Amazon EMR and installed Apache Spark 3.5.1 on Amazon Elastic Compute Cloud (Amazon EC2) clusters designated for open source Spark (OSS) benchmark runs. We ran tests on separate EC2 clusters comprised of nine r5d.4xlarge instances for each of Apache Spark 3.5.1, Amazon EMR 6.9.0, and Amazon EMR 7.1. The primary node has 16 vCPU and 128 GB memory and eight worker nodes have a total of 128 vCPU and 1024 GB memory. We tested with Amazon EMR defaults to highlight the out-of-the-box experience and tuned Apache Spark with the minimal settings needed to provide a fair comparison.

For the source data, we chose the 3 TB scale factor, which contains 17.7 billion records, approximately 924 GB of compressed data in Parquet file format. The setup instructions and technical details can be found in the GitHub repository. We used Spark’s in-memory data catalog to store metadata for TPC-DS databases and tables. spark.sql.catalogImplementation is set to the default value in-memory. The fact tables are partitioned by the date column, which consists of partitions ranging from 200–2,100. No statistics were pre-calculated for these tables.

A total of 104 SparkSQL queries were run in three iterations sequentially and an average of each query’s runtime in these three iterations was used for comparison. The average of the three iterations’ runtime on Amazon EMR 7.1 was 0.51 hours, which is 1.9 times faster than Amazon EMR 6.9 and 4.5 times faster than Apache Spark 3.5.1. The following figure illustrates the total runtimes in seconds.

The per-query speedup on Amazon EMR 7.1 when compared to Apache Spark 3.5.1 is illustrated in the following chart. Although Amazon EMR is faster than Apache Spark on all TPC-DS queries, the speedup is much greater on some queries than on others. The horizontal axis represents queries in the TPC-DS 3 TB benchmark ordered by the Amazon EMR speedup descending and the vertical axis shows the speedup of queries due to the Amazon EMR runtime.

Cost comparison

Our benchmark outputs the total runtime and geometric mean figures to measure the Spark runtime performance by simulating a real-world complex decision support use case. The cost metric can provide us with additional insights. Cost estimates are computed using the following formulas. They factor in Amazon EC2, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR costs, but don’t include Amazon S3 GET and PUT costs.

  • Amazon EC2 cost (include SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

Based on the calculation, the Amazon EMR 7.1 benchmark result demonstrates a 2.8 times improvement in job cost compared to Apache Spark 3.5.1 and a 1.7 times improvement when compared to Amazon EMR 6.9.

Metric Amazon EMR 7.1 Amazon EMR 6.9 Apache Spark 3.5.1
Runtime in hours 0.51 0.87 1.76
Number of EC2 instances 9 9 9
Amazon EBS Size 20gb 20gb 20gb
Amazon EC2 cost $5.29 $9.02 $18.25
Amazon EBS cost $0.01 $0.02 $0.04
Amazon EMR cost $1.24 $2.11 $0.00
Total cost $6.54 $11.15 $18.29
Cost Savings Baseline Amazon EMR 7.1 is 1.7 times better Amazon EMR 7.1 is 2.8 times better

Run OSS Spark benchmarking

For running Apache Spark 3.5.1, we used the following configurations to set up an EC2 cluster. We used one primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance Storage (GB) EBS Root Volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repo, set up the TPC-DS source data in your S3 bucket and your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.1.jar to your S3 bucket.

This benchmark application is built from branch tpcds-v2.13. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repo.

Create and configure a YARN cluster on Amazon EC2

Follow the instructions in the emr-spark-benchmark GitHub repo to create an OSS Spark cluster on Amazon EC2 using Flintrock.

Based on the cluster selection for this test, the following are the configurations used:

Run the TPC-DS benchmark for Apache Spark 3.5.1

Complete the following steps to run the TPC-DS benchmark for Apache Spark 3.5.1:

  1. Log in to the OSS cluster primary using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. The TPC-DS source data is at s3a://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned. Check the prerequisites on how to set up the source data.
    2. The results are created in s3a://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4 \
spark-benchmark-assembly-3.5.1.jar \
s3a://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned \
s3a://<YOUR_S3_BUCKET>/benchmark_run \
/opt/tpcds-kit/tools parquet 3000 3 false \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13 \
true > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

When the Spark job is complete, download the test result file from the output S3 bucket s3a://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. You can use the Amazon S3 console and navigate to the output bucket location or use the Amazon Command Line Interface (AWS CLI).

The Spark benchmark application creates a timestamp folder and writes a summary file inside a summary.csv prefix. Your timestamp and file name will be different from the one shown in the preceding example.

The output CSV files have four columns without header names:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

Because we have three runs, we can then compute the average and geometric mean of the runtimes.

Run the TPC-DS benchmark using Amazon EMR Spark

For detailed instructions, see Steps to run Spark Benchmarking.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure your AWS CLI shell to point to the benchmarking account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR Cluster and run benchmark job to spin up an EMR on EC2 cluster. Update the provided script with the correct Amazon EMR version and root volume size, and provide the values required. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. You need this in the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps in the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from the create cluster response.
    2. The benchmark application is at s3://<YOUR_S3_BUCKET>/spark-benchmark-assembly-3.5.1.jar.
    3. The TPC-DS source data is at s3://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned.
    4. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
aws emr add-steps \
    --cluster-id <cluster ID>  \
    --steps Type=Spark,Name="TPCDS Benchmark Job",Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,s3://<YOUR_S3_BUCKET>/spark-benchmark-assembly-3.5.1.jar,s3://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned,s3://<YOUR_S3_BUCKET>/benchmark_run,/home/hadoop/tpcds-kit/tools,parquet,3000,3,false,'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,q99-v2.13\,ss_max-v2.13',true],ActionOnFailure=CONTINUE

Summarize the results

After the job is complete, retrieve the summary results from s3://<YOUR_S3_BUCKET>/benchmark_run in the same way as the OSS benchmark runs and compute the average and geomean for Amazon EMR runs.

Clean up

To avoid incurring future charges, delete the resources you created using the instructions in the Cleanup section of the GitHub repo.

Summary

Amazon EMR continues to improve the EMR runtime for Apache Spark, leading to a performance improvement of 1.9x year-over-year and 4.5x faster performance than OSS Spark 3.5.1. We recommend that you stay up to date with the latest Amazon EMR release to take advantage of the latest performance benefits.

To keep up to date, subscribe to the Big Data Blog’s RSS feed to learn more about the EMR runtime for Apache Spark, configuration best practices, and tuning advice.


About the author

Ashok Chintalapati is a software development engineer for Amazon EMR at Amazon Web Services.

Steve Koonce is an Engineering Manager for EMR at Amazon Web Services.

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Design a data mesh pattern for Amazon EMR-based data lakes using AWS Lake Formation with Hive metastore federation

Post Syndicated from Sudipta Mitra original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-pattern-for-amazon-emr-based-data-lakes-using-aws-lake-formation-with-hive-metastore-federation/

In this post, we delve into the key aspects of using Amazon EMR for modern data management, covering topics such as data governance, data mesh deployment, and streamlined data discovery.

One of the key challenges in modern big data management is facilitating efficient data sharing and access control across multiple EMR clusters. Organizations have multiple Hive data warehouses across EMR clusters, where the metadata gets generated. To address this challenge, organizations can deploy a data mesh using AWS Lake Formation that connects the multiple EMR clusters. With the AWS Glue Data Catalog federation to external Hive metastore feature, you can now now apply data governance to the metadata residing across those EMR clusters and analyze them using AWS analytics services such as Amazon Athena, Amazon Redshift Spectrum, AWS Glue ETL (extract, transform, and load) jobs, EMR notebooks, EMR Serverless using Lake Formation for fine-grained access control, and Amazon SageMaker Studio. For detailed information on managing your Apache Hive metastore using Lake Formation permissions, refer to Query your Apache Hive metastore with AWS Lake Formation permissions.

In this post, we present a methodology for deploying a data mesh consisting of multiple Hive data warehouses across EMR clusters. This approach enables organizations to take advantage of the scalability and flexibility of EMR clusters while maintaining control and integrity of their data assets across the data mesh.

Use cases for Hive metastore federation for Amazon EMR

Hive metastore federation for Amazon EMR is applicable to the following use cases:

  • Governance of Amazon EMR-based data lakes – Producers generate data within their AWS accounts using an Amazon EMR-based data lake supported by EMRFS on Amazon Simple Storage Service (Amazon S3)and HBase. These data lakes require governance for access without the necessity of moving data to consumer accounts. The data resides on Amazon S3, which reduces the storage costs significantly.
  • Centralized catalog for published data – Multiple producers release data currently governed by their respective entities. For consumer access, a centralized catalog is necessary where producers can publish their data assets.
  • Consumer personas – Consumers include data analysts who run queries on the data lake, data scientists who prepare data for machine learning (ML) models and conduct exploratory analysis, as well as downstream systems that run batch jobs on the data within the data lake.
  • Cross-producer data access – Consumers may need to access data from multiple producers within the same catalog environment.
  • Data access entitlements – Data access entitlements involve implementing restrictions at the database, table, and column levels to provide appropriate data access control.

Solution overview

The following diagram shows how data from producers with their own Hive metastores (left) can be made available to consumers (right) using Lake Formation permissions enforced in a central governance account.

Producer and consumer are logical concepts used to indicate the production and consumption of data through a catalog. An entity can act both as a producer of data assets and as a consumer of data assets. The onboarding of producers is facilitated by sharing metadata, whereas the onboarding of consumers is based on granting permission to access this metadata.

The solution consists of multiple steps in the producer, catalog, and consumer accounts:

  1. Deploy the AWS CloudFormation templates and set up the producer, central governance and catalog, and consumer accounts.
  2. Test access to the producer cataloged Amazon S3 data using EMR Serverless in the consumer account.
  3. Test access using Athena queries in the consumer account.
  4. Test access using SageMaker Studio in the consumer account.

Producer

Producers create data within their AWS accounts using an Amazon EMR-based data lake and Amazon S3. Multiple producers then publish this data into a central catalog (data lake technology) account. Each producer account, along with the central catalog account, has either VPC peering or AWS Transit Gateway enabled to facilitate AWS Glue Data Catalog federation with the Hive metastore.

For each producer, an AWS Glue Hive metastore connector AWS Lambda function is deployed in the catalog account. This enables the Data Catalog to access Hive metastore information at runtime from the producer. The data lake locations (the S3 bucket location of the producers) are registered in the catalog account.

Central catalog

A catalog offers governed and secure data access to consumers. Federated databases are established within the catalog account’s Data Catalog using the Hive connection, managed by the catalog Lake Formation admin (LF-Admin). These federated databases in the catalog account are then shared by the data lake LF-Admin with the consumer LF-Admin of the external consumer account.

Data access entitlements are managed by applying access controls as needed at various levels, such as the database or table.

Consumer

The consumer LF-Admin grants the necessary permissions or restricted permissions to roles such as data analysts, data scientists, and downstream batch processing engine AWS Identity and Access Management (IAM) roles within its account.

Data access entitlements are managed by applying access control based on requirements at various levels, such as databases and tables.

Prerequisites

You need three AWS accounts with admin access to implement this solution. It is recommended to use test accounts. The producer account will host the EMR cluster and S3 buckets. The catalog account will host Lake Formation and AWS Glue. The consumer account will host EMR Serverless, Athena, and SageMaker notebooks.

Set up the producer account

Before you launch the CloudFormation stack, gather the following information from the catalog account:

  • Catalog AWS account ID (12-digit account ID)
  • Catalog VPC ID (for example, vpc-xxxxxxxx)
  • VPC CIDR (catalog account VPC CIDR; it should not overlap 10.0.0.0/16)

The VPC CIDR of the producer and catalog can’t overlap due to VPC peering and Transit Gateway requirements. The VPC CIDR should be a VPC from the catalog account where the AWS Glue metastore connector Lambda function will be eventually deployed.

The CloudFormation stack for the producer creates the following resources:

  • S3 bucket to host data for the Hive metastore of the EMR cluster.
  • VPC with the CIDR 10.0.0.0/16. Make sure there is no existing VPC with this CIDR in use.
  • VPC peering connection between the producer and catalog account.
  • Amazon Elastic Compute Cloud (Amazon EC2) security groups for the EMR cluster.
  • IAM roles required for the solution.
  • EMR 6.10 cluster launched with Hive.
  • Sample data downloaded to the S3 bucket.
  • A database and external tables, pointing to the downloaded sample data, in its Hive metastore.

Complete the following steps:

  1. Launch the template PRODUCER.yml. It’s recommended to use an IAM role that has administrator privileges.
  2. Gather the values for the following on the CloudFormation stack’s Outputs tab:
    1. VpcPeeringConnectionId (for example, pcx-xxxxxxxxx)
    2. DestinationCidrBlock (10.0.0.0/16)
    3. S3ProducerDataLakeBucketName

Set up the catalog account

The CloudFormation stack for the catalog account creates the Lambda function for federation. Before you launch the template, on the Lake Formation console, add the IAM role and user deploying the stack as the data lake admin.

Then complete the following steps:

  1. Launch the template CATALOG.yml.
  2. For the RouteTableId parameter, use the catalog account VPC RouteTableId. This is the VPC where the AWS Glue Hive metastore connector Lambda function will be deployed.
  3. On the stack’s Outputs tab, copy the value for LFRegisterLocationServiceRole (arn:aws:iam::account-id: role/role-name).
  4. Confirm if the Data Catalog setting has the IAM access control options un-checked and the current cross-account version is set to 4.

  1. Log in to the producer account and add the following bucket policy to the producer S3 bucket that was created during the producer account setup. Add the ARN of LFRegisterLocationServiceRole to the Principal section and provide the S3 bucket name under the Resource section.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::account-id: role/role-name"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::s3-bucket-name/*",
                "arn:aws:s3:::s3-bucket-name"
            ]
        }
    ]
}
  1. In the producer account, on the Amazon EMR console, navigate to the primary node EC2 instance to get the value for Private IP DNS name (IPv4 only) (for example, ip-xx-x-x-xx.us-west-1.compute.internal).

  1. Switch to the catalog account and deploy the AWS Glue Data Catalog federation Lambda function (GlueDataCatalogFederation-HiveMetastore).

The default Region is set to us-east-1. Change it to your desired Region before deploying the function.

Use the VPC that was used as the CloudFormation input for the VPC CIDR. You can use the VPC’s default security group ID. If using another security group, make sure the outbound allows traffic to 0.0.0.0/0.

Next, you create a federated database in Lake Formation.

  1. On the Lake Formation console, choose Data sharing in the navigation pane.
  2. Choose Create database.

  1. Provide the following information:
    1. For Connection name, choose your connection.
    2. For Database name, enter a name for your database.
    3. For Database identifier, enter emrhms_salesdb (this is the database created on the EMR Hive metastore).
  2. Choose Create database.

  1. On the Databases page, select the database and on the Actions menu, choose Grant to grant describe permissions to the consumer account.

  1. Under Principals, select External accounts and choose your account ARN.
  2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database and table.
  3. Under Table permissions, provide the following information:
    1. For Table permissions¸ select Select and Describe.
    2. For Grantable permissions¸ select Select and Describe.
  4. Under Data permissions, select All data access.
  5. Choose Grant.

  1. On the Tables page, select your table and on the Actions menu, choose Grant to grant select and describe permissions.

  1. Under Principals, select External accounts and choose your account ARN.
  2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database.
  3. Under Database permissions¸ provide the following information:
    1. For Database permissions¸ select Create table and Describe.
    2. For Grantable permissions¸ select Create table and Describe.
  4. Choose Grant.

Set up the consumer account

Consumers include data analysts who run queries on the data lake, data scientists who prepare data for ML models and conduct exploratory analysis, as well as downstream systems that run batch jobs on the data within the data lake.

The consumer account setup in this section shows how you can query the shared Hive metastore data using Athena for the data analyst persona, EMR Serverless to run batch scripts, and SageMaker Studio for the data scientist to further use data in the downstream model building process.

For EMR Serverless and SageMaker Studio, if you’re using the default IAM service role, add the required Data Catalog and Lake Formation IAM permissions to the role and use Lake Formation to grant table permission access to the role’s ARN.

Data analyst use case

In this section, we demonstrate how a data analyst can query the Hive metastore data using Athena. Before you get started, on the Lake Formation console, add the IAM role or user deploying the CloudFormation stack as the data lake admin.

Then complete the following steps:

  1. Run the CloudFormation template CONSUMER.yml.
  2. If the catalog and consumer accounts are not part of the organization in AWS Organizations, navigate to the AWS Resource Access Manager (AWS RAM) console and manually accept the resources shared from the catalog account.
  3. On the Lake Formation console, on the Databases page, select your database and on the Actions menu, choose Create resource link.

  1. Under Database resource link details, provide the following information:
    1. For Resource link name, enter a name.
    2. For Shared database’s region, choose a Region.
    3. For Shared database, choose your database.
    4. For Shared database’s owner ID, enter the account ID.
  2. Choose Create.

Now you can use Athena to query the table on the consumer side, as shown in the following screenshot.

Batch job use case

Complete the following steps to set up EMR Serverless to run a sample Spark job to query the existing table:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Get started.

  1. Choose Create and launch EMR Studio.

  1. Under Application settings, provide the following information:
    1. For Name, enter a name.
    2. For Type, choose Spark.
    3. For Release version, choose the current version.
    4. For Architecture, select x86_64.
  2. Under Application setup options, select Use custom settings.

  1. Under Additional configurations, for Metastore configuration, select Use AWS Glue Data Catalog as metastore, then select Use Lake Formation for fine-grained access control.
  2. Choose Create and start application.

  1. On the application details page, on the Job runs tab, choose Submit job run.

  1. Under Job details, provide the following information:
    1. For Name, enter a name.
    2. For Runtime role¸ choose Create new role.
    3. Note the IAM role that gets created.
    4. For Script location, enter the S3 bucket location created by the CloudFormation template (the script is emr-serverless-query-script.py).
  2. Choose Submit job run.

  1. Add the following AWS Glue access policy to the IAM role created in the previous step (provide your Region and the account ID of your catalog account):
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:CreateDatabase",
                "glue:GetDataBases",
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:BatchCreatePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:1234567890:catalog",
                "arn:aws:glue:us-east-1:1234567890:database/*",
                "arn:aws:glue:us-east-1:1234567890:table/*/*"
            ]
        }
    ]
}
  1. Add the following Lake Formation access policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "LakeFormation:GetDataAccess"
            "Resource": "*"
        }
    ]
}
  1. On the Databases page, select the database and on the Actions menu, choose Grant to grant Lake Formation access to the EMR Serverless runtime role.
  2. Under Principals, select IAM users and roles and choose your role.
  3. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database.
  4. Under Resource link permissions, for Resource link permissions, select Describe.
  5. Choose Grant.

  1. On the Databases page, select the database and on the Actions menu, choose Grant on target.

  1. Provide the following information:
    1. Under Principals, select IAM users and roles and choose your role.
    2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database and table
    3. Under Table permissions, for Table permissions, select Select.
    4. Under Data permissions, select All data access.
  2. Choose Grant.

  1. Submit the job again by cloning it.
  2. When the job is complete, choose View logs.

The output should look like the following screenshot.

Data scientist use case

For this use case, a data scientist queries the data through SageMaker Studio. Complete the following steps:

  1. Set up SageMaker Studio.
  2. Confirm that the domain user role has been granted permission by Lake Formation to SELECT data from the table.
  3. Follow steps similar to the batch run use case to grant access.

The following screenshot shows an example notebook.

Clean up

We recommend deleting the CloudFormation stack after use, because the deployed resources will incur costs. There are no prerequisites to delete the producer, catalog, and consumer CloudFormation stacks. To delete the Hive metastore connector stack on the catalog account (serverlessrepo-GlueDataCatalogFederation-HiveMetastore), first delete the federated database you created.

Conclusion

In this post, we explained how to create a federated Hive metastore for deploying a data mesh architecture with multiple Hive data warehouses across EMR clusters.

By using Data Catalog metadata federation, organizations can construct a sophisticated data architecture. This approach not only seamlessly extends your Hive data warehouse but also consolidates access control and fosters integration with various AWS analytics services. Through effective data governance and meticulous orchestration of the data mesh architecture, organizations can provide data integrity, regulatory compliance, and enhanced data sharing across EMR clusters.

We encourage you to check out the features of the AWS Glue Hive metastore federation connector and explore how to implement a data mesh architecture across multiple EMR clusters. To learn more and get started, refer to the following resources:


About the Authors

Sudipta Mitra is a Senior Data Architect for AWS, and passionate about helping customers to build modern data analytics applications by making innovative use of latest AWS services and their constantly evolving features. A pragmatic architect who works backwards from customer needs, making them comfortable with the proposed solution, helping achieve tangible business outcomes. His main areas of work are Data Mesh, Data Lake, Knowledge Graph, Data Security and Data Governance.

Deepak Sharma is a Senior Data Architect with the AWS Professional Services team, specializing in big data and analytics solutions. With extensive experience in designing and implementing scalable data architectures, he collaborates closely with enterprise customers to build robust data lakes and advanced analytical applications on the AWS platform.

Nanda Chinnappa is a Cloud Infrastructure Architect with AWS Professional Services at Amazon Web Services. Nanda specializes in Infrastructure Automation, Cloud Migration, Disaster Recovery and Databases which includes Amazon RDS and Amazon Aurora. He helps AWS Customer’s adopt AWS Cloud and realize their business outcome by executing cloud computing initiatives.

Introducing Amazon EMR on EKS with Apache Flink: A scalable, reliable, and efficient data processing platform

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-with-apache-flink-a-scalable-reliable-and-efficient-data-processing-platform/

AWS recently announced that Apache Flink is generally available for Amazon EMR on Amazon Elastic Kubernetes Service (EKS). Apache Flink is a scalable, reliable, and efficient data processing framework that handles real-time streaming and batch workloads (but is most commonly used for real-time streaming). Amazon EMR on EKS is a deployment option for Amazon EMR that allows you to run open source big data frameworks such as Apache Spark and Flink on Amazon Elastic Kubernetes Service (Amazon EKS) clusters with the EMR runtime. With the addition of Flink support in EMR on EKS, you can now run your Flink applications on Amazon EKS using the EMR runtime and benefit from both services to deploy, scale, and operate Flink applications more efficiently and securely.

In this post, we introduce the features of EMR on EKS with Apache Flink, discuss their benefits, and highlight how to get started.

EMR on EKS for data workloads

AWS customers deploying large-scale data workloads are adopting the EMR runtime with Amazon EKS as the underlying orchestrator to benefit from complimenting features. This also enables multi-tenancy and allows data engineers and data scientists to focus on building the data applications, and the platform engineering and the site reliability engineering (SRE) team can manage the infrastructure. Some key benefits of Amazon EKS for these customers are:

  • The AWS-managed control plane, which improves resiliency and removes undifferentiated heavy lifting
  • Features like multi-tenancy and resource-based access policies (RBAC), which allow you to build cost-efficient platforms and enforce organization-wide governance policies
  • The extensibility of Kubernetes, which allows you to install open source add-ons (observability, security, notebooks) to meet your specific needs

The EMR runtime offers the following benefits:

  • Takes care of the undifferentiated heavy lifting of managing installations, configuration, patching, and backups
  • Simplifies scaling
  • Optimizes performance and cost
  • Implements security and compliance by integrating with other AWS services and tools

Benefits of EMR on EKS with Apache Flink

The flexibility to choose instance types, price, and AWS Region and Availability Zone according to the workload specification is often the main driver of reliability, availability, and cost-optimization. Amazon EMR on EKS natively integrates tools and functionalities to enable these—and more.

Integration with existing tools and processes, such as continuous integration and continuous development (CI/CD), observability, and governance policies, helps unify the tools used and decreases the time to launch new services. Many customers already have these tools and processes for their Amazon EKS infrastructure, which you can now easily extend to your Flink applications running on EMR on EKS. If you’re interested in building your Kubernetes and Amazon EKS capabilities, we recommend using EKS Blueprints, which provides a starting place to compose complete EKS clusters that are bootstrapped with the operational software that is needed to deploy and operate workloads.

Another benefit of running Flink applications with Amazon EMR on EKS is improving your applications’ scalability. The volume and complexity of data processed by Flink apps can vary significantly based on factors like the time of the day, day of the week, seasonality, or being tied to a specific marketing campaign or other activity. This volatility makes customers trade off between over-provisioning, which leads to inefficient resource usage and higher costs, or under-provisioning, where you risk missing latency and throughput SLAs or even service outages. When running Flink applications with Amazon EMR on EKS, the Flink auto scaler will increase the applications’ parallelism based on the data being ingested, and Amazon EKS auto scaling with Karpenter or Cluster Autoscaler will scale the underlying capacity required to meet those demands. In addition to scaling up, Amazon EKS can also scale your applications down when the resources aren’t needed so your Flink apps are more cost-efficient.

Running EMR on EKS with Flink allows you to run multiple versions of Flink on the same cluster. With traditional Amazon Elastic Compute Cloud (Amazon EC2) instances, each version of Flink needs to run on its own virtual machine to avoid challenges with resource management or conflicting dependencies and environment variables. However, containerizing Flink applications allows you to isolate versions and avoid conflicting dependencies, and running them on Amazon EKS allows you to use Kubernetes as the unified resource manager. This means that you have the flexibility to choose which version of Flink is best suited for each job, and also improves your agility to upgrade a single job to the next version of Flink rather than having to upgrade an entire cluster, or spin up a dedicated EC2 instance for a different Flink version, which would increase your costs.

Key EMR on EKS differentiations

In this section, we discuss the key EMR on EKS differentiations.

Faster restart of the Flink job during scaling or failure recovery

This is enabled by task local recovery via Amazon Elastic Block Store (Amazon EBS) volumes and fine-grained recovery support in Adaptive Scheduler.

Task local recovery via EBS volumes for TaskManager pods is available with Amazon EMR 6.15.0 and higher. The default overlay mount comes with 10 GB, which is sufficient for jobs with a lower state. Jobs with large states can enable the automatic EBS volume mount option. The TaskManager pods are automatically created and mounted during pod creation and removed during pod deletion.

Fine-grained recovery support in the adaptive scheduler is available with Amazon EMR 6.15.0 and higher. When a task fails during its run, fine-grained recovery restarts only the pipeline-connected component of the failed task, instead of resetting the entire graph, and triggers a complete rerun from the last completed checkpoint, which is more expensive than just rerunning the failed tasks. To enable fine-grained recovery, set the following configurations in your Flink configuration:

jobmanager.execution.failover-strategy: region
restart-strategy: exponential-delay or fixed-delay

Logging and monitoring support with customer managed keys

Monitoring and observability are key constructs of the AWS Well-Architected framework because they help you learn, measure, and adapt to operational changes. You can enable monitoring of launched Flink jobs while using EMR on EKS with Apache Flink. Amazon Managed Service for Prometheus is deployed automatically, if enabled while installing the Flink operator, and it helps analyze Prometheus metrics emitted for the Flink operator, job, and TaskManager.

You can use the Flink UI to monitor health and performance of Flink jobs through a browser using port-forwarding. We have also enabled collection and archival of operator and application logs to Amazon Simple Storage Service (Amazon S3) or Amazon CloudWatch using a FluentD sidecar. This can be enabled through a monitoringConfiguration block in the deployment customer resource definition (CRD):

monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
      encryptionKeyArn: CMK ARN FOR S3 BUCKET ENCRYPTION
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2Gb
        maxFilesToKeep: 10

Cost-optimization using Amazon EC2 Spot Instances

Amazon EC2 Spot Instances are an Amazon EC2 pricing option that provides steep discounts of up to 90% over On-Demand prices. It’s the preferred choice to run big data workloads because it helps improve throughput and optimize Amazon EC2 spend. Spot Instances are spare EC2 capacity and can be interrupted with notification if Amazon EC2 needs the capacity for On-Demand requests. Flink streaming jobs running on EMR on EKS can now respond to Spot Instance interruption, perform a just-in-time (JIT) checkpoint of the running jobs, and prevent scheduling further tasks on these Spot Instances. When restarting the job, not only will the job restart from the checkpoint, but a combined restart mechanism will provide a best-effort service to restart the job either after reaching target resource parallelism or the end of the current configured window. This can also prevent consecutive job restarts caused by Spot Instances stopping in a short interval and help reduce cost and improve performance.

To minimize the impact of Spot Instance interruptions, you should adopt Spot Instance best practices. The combined restart mechanism and JIT checkpoint is offered only in Adaptive Scheduler.

Integration with the AWS Glue Data Catalog as a metadata store for Flink applications

The AWS Glue Data Catalog is a centralized metadata repository for data assets across various data sources, and provides a unified interface to store and query information about data formats, schemas, and sources. Amazon EMR on EKS with Apache Flink releases 6.15.0 and higher support using the Data Catalog as a metadata store for streaming and batch SQL workflows. This further enables data understanding and makes sure that it is transformed correctly.

Integration with Amazon S3, enabling resiliency and operational efficiency

Amazon S3 is the preferred cloud object store for AWS customers to store not only data but also application JARs and scripts. EMR on EKS with Apache Flink can fetch application JARs and scripts (PyFlink) through deployment specification, which eliminates the need to build custom images in Flink’s Application Mode. When checkpointing on Amazon S3 is enabled, a managed state is persisted to provide consistent recovery in case of failures. Retrieval and storage of files using Amazon S3 is enabled by two different Flink connectors. We recommend using Presto S3 (s3p) for checkpointing and s3 or s3a for reading and writing files including JARs and scripts. See the following code:

...
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
...
job:
jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
entryClass: "org.apache.flink.client.python.PythonDriver"
...

Role-based access control using IRSA

IAM Roles for Service Accounts (IRSA) is the recommended way to implement role-based access control (RBAC) for deploying and running applications on Amazon EKS. EMR on EKS with Apache Flink creates two roles (IRSA) by default for Flink operator and Flink jobs. The operator role is used for JobManager and Flink services, and the job role is used for TaskManagers and ConfigMaps. This helps limit the scope of AWS Identity and Access Management (IAM) permission to a service account, helps with credential isolation, and improves auditability.

Get started with EMR on EKS with Apache Flink

If you want to run a Flink application on recently launched EMR on EKS with Apache Flink, refer to Running Flink jobs with Amazon EMR on EKS, which provides step-by-step guidance to deploy, run, and monitor Flink jobs.

We have also created an IaC (Infrastructure as Code) template for EMR on EKS with Flink Streaming as part of Data on EKS (DoEKS), an open-source project aimed at streamlining and accelerating the process of building, deploying, and scaling data and ML workloads on Amazon Elastic Kubernetes Service (Amazon EKS). This template will help you to provision a EMR on EKS with Flink cluster and evaluate the features as mentioned in this blog. This template comes with the best practices built in, so you can use this IaC template as a foundation for deploying EMR on EKS with Flink in your own environment if you decide to use it as part of your application.

Conclusion

In this post, we explored the features of recently launched EMR on EKS with Flink to help you understand how you might run Flink workloads on a managed, scalable, resilient, and cost-optimized EMR on EKS cluster. If you are planning to run/explore Flink workloads on Kubernetes consider running them on EMR on EKS with Apache Flink. Please do contact your AWS Solution Architects, who can be of assistance alongside your innovation journey.


About the Authors

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.

Alex Lines is a Principal Containers Specialist at AWS helping customers modernize their Data and ML applications on Amazon EKS.

Mengfei Wang is a Software Development Engineer specializing in building large-scale, robust software infrastructure to support big data demands on containers and Kubernetes within the EMR on EKS team. Beyond work, Mengfei is an enthusiastic snowboarder and a passionate home cook.

Jerry Zhang is a Software Development Manager in AWS EMR on EKS. His team focuses on helping AWS customers to solve their business problems using cutting-edge data analytics technology on AWS infrastructure.

Understanding Apache Iceberg on AWS with the new technical guide

Post Syndicated from Carlos Rodrigues original https://aws.amazon.com/blogs/big-data/understanding-apache-iceberg-on-aws-with-the-new-technical-guide/

We’re excited to announce the launch of the Apache Iceberg on AWS technical guide. Whether you are new to Apache Iceberg on AWS or already running production workloads on AWS, this comprehensive technical guide offers detailed guidance on foundational concepts to advanced optimizations to build your transactional data lake with Apache Iceberg on AWS.

Apache Iceberg is an open source table format that simplifies data processing on large datasets stored in data lakes. It does so by bringing the familiarity of SQL tables to big data and capabilities such as ACID transactions, row-level operations (merge, update, delete), partition evolution, data versioning, incremental processing, and advanced query scanning. Apache Iceberg seamlessly integrates with popular open source big data processing frameworks like Apache Spark, Apache Hive, Apache Flink, Presto, and Trino. It is natively supported by AWS analytics services such as AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift.

The following diagram illustrates a reference architecture of a transactional data lake with Apache Iceberg on AWS.

AWS customers and data engineers use the Apache Iceberg table format for its many benefits, as well as for its high performance and reliability at scale to build transactional data lakes and write-optimized solutions with Amazon EMR, AWS Glue, Athena, and Amazon Redshift on Amazon Simple Storage Service (Amazon S3).

We believe Apache Iceberg adoption on AWS will continue to grow rapidly, and you can benefit from this technical guide that delivers productive guidance on working with Apache Iceberg on supported AWS services, best practices on cost-optimization and performance, and effective monitoring and maintenance policies.

Related resources


About the Authors

Carlos Rodrigues is a Big Data Specialist Solutions Architect at AWS. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Iceberg and Apache Hudi. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He is an expert on data engineering and enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Shana Schipers is an Analytics Specialist Solutions Architect at AWS, focusing on big data. She supports customers worldwide in building transactional data lakes using open table formats like Apache Hudi, Apache Iceberg, and Delta Lake on AWS.

AWS Weekly Roundup: New capabilities in Amazon Bedrock, AWS Amplify Gen 2, Amazon RDS and more (May 13, 2024)

Post Syndicated from Abhishek Gupta original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-new-capabilities-in-amazon-bedrock-aws-amplify-gen-2-amazon-rds-and-more-may-13-2024/

AWS Summit is in full swing around the world, with the most recent one being AWS Summit Singapore! Here is a sneak peek of the AWS staff and ASEAN community members at the Developer Lounge booth. It featured AWS Community speakers giving lightning talks on serverless, Amazon Elastic Kubernetes Service (Amazon EKS), security, generative AI, and more.

Last week’s launches
Here are some launches that caught my attention. Not surprisingly, a lot of interesting generative AI features!

Amazon Titan Text Premier is now available in Amazon Bedrock – This is the latest addition to the Amazon Titan family of large language models (LLMs) and offers optimized performance for key features like Retrieval Augmented Generation (RAG) on Knowledge Bases for Amazon Bedrock, and function calling on Agents for Amazon Bedrock.

Amazon Bedrock Studio is now available in public previewAmazon Bedrock Studio offers a web-based experience to accelerate the development of generative AI applications by providing a rapid prototyping environment with key Amazon Bedrock features, including Knowledge Bases, Agents, and Guardrails.

Amazon Bedrock Studio

Agents for Amazon Bedrock now supports Provisioned Throughput pricing model – As agentic applications scale, they require higher input and output model throughput compared to on-demand limits. The Provisioned Throughput pricing model makes it possible to purchase model units for the specific base model.

MongoDB Atlas is now available as a vector store in Knowledge Bases for Amazon Bedrock – With MongoDB Atlas vector store integration, you can build RAG solutions to securely connect your organization’s private data sources to foundation models (FMs) in Amazon Bedrock.

Amazon RDS for PostgreSQL supports pgvector 0.7.0 – You can use the open-source PostgreSQL extension for storing vector embeddings and add retrieval-augemented generation (RAG) capability in your generative AI applications. This release includes features that increase the number of dimensions of vectors you can index, reduce index size, and includes additional support for using CPU SIMD in distance computations. Also Amazon RDS Performance Insights now supports the Oracle Multitenant configuration on Amazon RDS for Oracle.

Amazon EC2 Inf2 instances are now available in new regions – These instances are optimized for generative AI workloads and are generally available in the Asia Pacific (Sydney), Europe (London), Europe (Paris), Europe (Stockholm), and South America (Sao Paulo) Regions.

New Generative Engine in Amazon Polly is now generally available – The generative engine in Amazon Polly is it’s most advanced text-to-speech (TTS) model and currently includes two American English voices, Ruth and Matthew, and one British English voice, Amy.

AWS Amplify Gen 2 is now generally availableAWS Amplify offers a code-first developer experience for building full-stack apps using TypeScript and enables developers to express app requirements like the data models, business logic, and authorization rules in TypeScript. AWS Amplify Gen 2 has added a number of features since the preview, including a new Amplify console with features such as custom domains, data management, and pull request (PR) previews.

Amazon EMR Serverless now includes performance monitoring of Apache Spark jobs with Amazon Managed Service for Prometheus – This lets you analyze, monitor, and optimize your jobs using job-specific engine metrics and information about Spark event timelines, stages, tasks, and executors. Also, Amazon EMR Studio is now available in the Asia Pacific (Melbourne) and Israel (Tel Aviv) Regions.

Amazon MemoryDB launched two new condition keys for IAM policies – The new condition keys let you create AWS Identity and Access Management (IAM) policies or Service Control Policies (SCPs) to enhance security and meet compliance requirements. Also, Amazon ElastiCache has updated it’s minimum TLS version to 1.2.

Amazon Lightsail now offers a larger instance bundle – This includes 16 vCPUs and 64 GB memory. You can now scale your web applications and run more compute and memory-intensive workloads in Lightsail.

Amazon Elastic Container Registry (ECR) adds pull through cache support for GitLab Container Registry – ECR customers can create a pull through cache rule that maps an upstream registry to a namespace in their private ECR registry. Once rule is configured, images can be pulled through ECR from GitLab Container Registry. ECR automatically creates new repositories for cached images and keeps them in-sync with the upstream registry.

AWS Resilience Hub expands application resilience drift detection capabilities – This new enhancement detects changes, such as the addition or deletion of resources within the application’s input sources.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS news
Here are some additional projects and blog posts that you might find interesting.

Building games with LLMs – Check out this fun experiment by Banjo Obayomi to generate Super Mario levels using different LLMs on Amazon Bedrock!

Troubleshooting with Amazon Q –  Ricardo Ferreira walks us through how he solved a nasty data serialization problem while working with Apache Kafka, Go, and Protocol Buffers.

Getting started with Amazon Q in VS Code – Check out this excellent step-by-step guide by Rohini Gaonkar that covers installing the extension for features like code completion chat, and productivity-boosting capabilities powered by generative AI.

AWS open source news and updates – My colleague Ricardo writes about open source projects, tools, and events from the AWS Community. Check out Ricardo’s page for the latest updates.

Upcoming AWS events
Check your calendars and sign up for upcoming AWS events:

AWS Summits – Join free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Register in your nearest city: Bengaluru (May 15–16), Seoul (May 16–17), Hong Kong (May 22), Milan (May 23), Stockholm (June 4), and Madrid (June 5).

AWS re:Inforce – Explore 2.5 days of immersive cloud security learning in the age of generative AI at AWS re:Inforce, June 10–12 in Pennsylvania.

AWS Community Days – Join community-led conferences that feature technical discussions, workshops, and hands-on labs led by expert AWS users and industry leaders from around the world: Turkey (May 18), Midwest | Columbus (June 13), Sri Lanka (June 27), Cameroon (July 13), Nigeria (August 24), and New York (August 28).

Browse all upcoming AWS led in-person and virtual events and developer-focused events.

That’s all for this week. Check back next Monday for another Weekly Roundup!

— Abhishek

This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Use your corporate identities for analytics with Amazon EMR and AWS IAM Identity Center

Post Syndicated from Pradeep Misra original https://aws.amazon.com/blogs/big-data/use-your-corporate-identities-for-analytics-with-amazon-emr-and-aws-iam-identity-center/

To enable your workforce users for analytics with fine-grained data access controls and audit data access, you might have to create multiple AWS Identity and Access Management (IAM) roles with different data permissions and map the workforce users to one of those roles. Multiple users are often mapped to the same role where they need similar privileges to enable data access controls at the corporate user or group level and audit data access.

AWS IAM Identity Center enables centralized management of workforce user access to AWS accounts and applications using a local identity store or by connecting corporate directories via identity providers (IdPs). IAM Identity Center now supports trusted identity propagation, a streamlined experience for users who require access to data with AWS analytics services.

Amazon EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to build data engineering and data science applications. With trusted identity propagation, data access management can be based on a user’s corporate identity and can be propagated seamlessly as they access data with single sign-on to build analytics applications with Amazon EMR (EMR Studio and Amazon EMR on EC2).

AWS Lake Formation allows data administrators to centrally govern, secure, and share data for analytics and machine learning (ML). With trusted identity propagation, data administrators can directly provide granular access to corporate users using their identity attributes and simplify the traceability of end-to-end data access across AWS services. Because access is managed based on a user’s corporate identity, they don’t need to use database local user credentials or assume an IAM role to access data.

In this post, we show how to bring your workforce identity to EMR Studio for analytics use cases, directly manage fine-grained permissions for the corporate users and groups using Lake Formation, and audit their data access.

Solution overview

For our use case, we want to enable a data analyst user named analyst1 to use their own enterprise credentials to query data they have been granted permissions to and audit their data access. We use Okta as the IdP for this demonstration. The following diagram illustrates the solution architecture.

This architecture is based on the following components:

  • Okta is responsible for maintaining the corporate user identities, related groups, and user authentication.
  • IAM Identity Center connects Okta users and centrally manages their access across AWS accounts and applications.
  • Lake Formation provides fine-grained access controls on data directly to corporate users using trusted identity propagation.
  • EMR Studio is an IDE for users to build and run applications. It allows users to log in directly with their corporate credentials without signing in to the AWS Management Console.
  • AWS Service Catalog provides a product template to create EMR clusters.
  • EMR cluster is integrated with IAM Identity Center using a security configuration.
  • AWS CloudTrail captures user data access activities.

The following are the high-level steps to implement the solution:

  1. Integrate Okta with IAM Identity Center.
  2. Set up Amazon EMR Studio.
  3. Create an IAM Identity Center enabled security configuration for EMR clusters.
  4. Create a Service Catalog product template to create the EMR clusters.
  5. Use Lake Formation to grant permissions to users to access data.
  6. Test the solution by accessing data with a corporate identity.
  7. Audit user data access.

Prerequisites

You should have the following prerequisites:

Integrate Okta with IAM Identity Center

For more information about configuring Okta with IAM Identity Center, refer to Configure SAML and SCIM with Okta and IAM Identity Center.

For this setup, we have created two users, analyst1 and engineer1, and assigned them to the corresponding Okta application. You can validate the integration is working by navigating to the Users page on the IAM Identity Center console, as shown in the following screenshot. Both enterprise users from Okta are provisioned in IAM Identity Center.

The following exact users will not be listed in your account. You can either create similar users or use an existing user.

Each provisioned user in IAM Identity Center has a unique user ID. This ID does not originate from Okta; it’s created in IAM Identity Center to uniquely identify this user. With trusted identity propagation, this user ID will be propagated across services and also used for traceability purposes in CloudTrail. The following screenshot shows the IAM Identity Center user matching the provisioned Okta user analyst1.

Choose the link under AWS access portal URL and log in with the analyst1 Okta user credentials that are already assigned to this application.

If you are able to log in and see the landing page, then all your configurations up to this step are set correctly. You will not see any applications on this page yet.

Set up EMR Studio

In this step, we demonstrate the actions needed from the data lake administrator to set up EMR Studio enabled for trusted identity propagation and with IAM Identity Center integration. This allows users to directly access EMR Studio with their enterprise credentials.

Note: All Amazon S3 buckets (created after January 5, 2023) have encryption configured by default (Amazon S3 managed keys (SSE-S3)), and all new objects that are uploaded to an S3 bucket are automatically encrypted at rest. To use a different type of encryption, to meet your security needs, please update the default encryption configuration for the bucket. See Protecting data for server-side encryption for further details.

  • On the Amazon EMR console, choose Studios in the navigation pane under EMR Studio.
  • Choose Create Studio.

  • For Setup options¸ select Custom.
  • For Studio name, enter a name (for this post, emr-studio-with-tip).
  • For S3 location for Workspace storage, select Select existing location and enter an existing S3 bucket (if you have one). Otherwise, select Create new bucket.

  • For Service role to let Studio access your AWS resources, choose View permissions details to get the trust and IAM policy information that is needed and create a role with those specific policies in IAM. In this case, we create a new role called emr_tip_role.

  • For Service role to let Studio access your AWS resources, choose the IAM role you created.
  • For Workspace name, enter a name (for this post, studio-workspace-with-tip).

  • For Authentication, select IAM Identity Center.
  • For User role¸ you can create a new role or choose an existing role. For this post, we choose the role we created (emr_tip_role).
  • To use the same role, add the following statement to the trust policy of the service role:
{
  "Version": "2008-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "elasticmapreduce.amazonaws.com",
 "AWS": "arn:aws:iam::xxxxxx:role/emr_tip_role"
      },
      "Action": [
              "sts:AssumeRole",
              "sts:SetContext"
              ]
    }
  ]
}
  • Select Enable trusted identity propagation to allow you to control and log user access across connected applications.

  • For Choose who can access your application, select All users and groups.

Later, we restrict access to resources using Lake Formation. However, there is an option here to restrict access to only assigned users and groups.

  • In the Networking and security section, you can provide optional details for your VPC, subnets, and security group settings.
  • Choose Create Studio.

  • On the Studios page of the Amazon EMR console, locate your Studio enabled with IAM Identity Center.
  • Copy the link for Studio Access URL.

  • Enter the URL into a web browser and log in using Okta credentials.

You should be able to successfully sign in to the EMR Studio console.

Create an AWS Identity Center enabled security configuration for EMR clusters

EMR security configurations allow you to configure data encryption, Kerberos authentication, and Amazon S3 authorization for the EMR File System (EMRFS) on the clusters. The security configuration is available to use and reuse when you create clusters.

To integrate Amazon EMR with IAM Identity Center, you need to first create an IAM role that authenticates with IAM Identity Center from the EMR cluster. Amazon EMR uses IAM credentials to relay the IAM Identity Center identity to downstream services such as Lake Formation. The IAM role should also have the respective permissions to invoke the downstream services.

  1. Create a role (for this post, called emr-idc-application) with the following trust and permission policy. The role referenced in the trust policy is the InstanceProfile role for EMR clusters. This allows the EC2 instance profile to assume this role and act as an identity broker on behalf of the federated users.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::xxxxxxxxxxn:role/service-role/AmazonEMR-InstanceProfile-20240127T102444"
            },
            "Action": [
                "sts:AssumeRole",
                "sts:SetContext"
            ]
        }
    ]
}
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "IdCPermissions",
            "Effect": "Allow",
            "Action": [
                "sso-oauth:*"
            ],
            "Resource": "*"
        },
        {
            "Sid": "GlueandLakePermissions",
            "Effect": "Allow",
            "Action": [
                "glue:*",
                "lakeformation:GetDataAccess"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3Permissions",
            "Effect": "Allow",
            "Action": [
                "s3:GetDataAccess",
                "s3:GetAccessGrantsInstanceForPrefix"
            ],
            "Resource": "*"
        }
    ]
}

Next, you create certificates for encrypting data in transit with Amazon EMR.

  • For this post, we use OpenSSL to generate a self-signed X.509 certificate with a 2048-bit RSA private key.

The key allows access to the issuer’s EMR cluster instances in the AWS Region being used. For a complete guide on creating and providing a certificate, refer to Providing certificates for encrypting data in transit with Amazon EMR encryption.

  • Upload my-certs.zip to an S3 location that will be used to create the security configuration.

The EMR service role should have access to the S3 location. The key allows access to the issuer’s EMR cluster instances in the us-west-2 Region as specified by the *.us-west-2.compute.internal domain name as the common name. You can change this to the Region your cluster is in.

$ openssl req -x509 -newkey rsa:2048 -keyout privateKey.pem -out certificateChain.pem -days 365 -nodes -subj '/CN=*.us-west-2.compute.internal'
$ cp certificateChain.pem trustedCertificates.pem
$ zip -r -X my-certs.zip certificateChain.pem privateKey.pem trustedCertificates.pem
  • Create an EMR security configuration with IAM Identity Center enabled from the AWS Command Line Interface (AWS CLI) with the following code:
aws emr create-security-configuration --name "IdentityCenterConfiguration-with-lf-tip" --region "us-west-2" --endpoint-url https://elasticmapreduce.us-west-2.amazonaws.com --security-configuration '{
    "AuthenticationConfiguration":{
        "IdentityCenterConfiguration":{
            "EnableIdentityCenter":true,
            "IdentityCenterApplicationAssigmentRequired":false,
            "IdentityCenterInstanceARN": "arn:aws:sso:::instance/ssoins-7907b0d7d77e3e0d",
            "IAMRoleForEMRIdentityCenterApplicationARN": "arn:aws:iam::1xxxxxxxxx0:role/emr-idc-application"
        }
    },
    "AuthorizationConfiguration": {
        "LakeFormationConfiguration": {
            "EnableLakeFormation": true
        }
    },
    "EncryptionConfiguration": {
        "EnableInTransitEncryption": true,
        "EnableAtRestEncryption": false,
        "InTransitEncryptionConfiguration": {
            "TLSCertificateConfiguration": {
                "CertificateProviderType": "PEM",
                "S3Object": "s3://<<Bucket Name>>/emr-transit-encry-certs/my-certs.zip"
            }
        }
    }
}' 

You can view the security configuration on the Amazon EMR console.

Create a Service Catalog product template to create EMR clusters

EMR Studio with trusted identity propagation enabled can only work with clusters created from a template. Complete the following steps to create a product template in Service Catalog:

  • On the Service Catalog console, choose Portfolios under Administration in the navigation pane.
  • Choose Create portfolio.

  • Enter a name for your portfolio (for this post, EMR Clusters Template) and an optional description.
  • Choose Create.

  • On the Portfolios page, choose the portfolio you just created to view its details.

  • On the Products tab, choose Create product.

  • For Product type, select CloudFormation.
  • For Product name, enter a name (for this post, EMR-7.0.0).
  • Use the security configuration IdentityCenterConfiguration-with-lf-tip you created in previous steps with the appropriate Amazon EMR service roles.
  • Choose Create product.

The following is an example CloudFormation template. Update the account-specific values for SecurityConfiguration, JobFlowRole, ServiceRole, LogUri, Ec2KeyName, and Ec2SubnetId. We provide a sample Amazon EMR service role and trust policy in Appendix A at the end of this post.

'Parameters':
  'ClusterName':
    'Type': 'String'
    'Default': 'EMR_TIP_Cluster'
  'EmrRelease':
    'Type': 'String'
    'Default': 'emr-7.0.0'
    'AllowedValues':
    - 'emr-7.0.0'
  'ClusterInstanceType':
    'Type': 'String'
    'Default': 'm5.xlarge'
    'AllowedValues':
    - 'm5.xlarge'
    - 'm5.2xlarge'
'Resources':
  'EmrCluster':
    'Type': 'AWS::EMR::Cluster'
    'Properties':
      'Applications':
      - 'Name': 'Spark'
      - 'Name': 'Livy'
      - 'Name': 'Hadoop'
      - 'Name': 'JupyterEnterpriseGateway'       
      'SecurityConfiguration': 'IdentityCenterConfiguration-with-lf-tip'
      'EbsRootVolumeSize': '20'
      'Name':
        'Ref': 'ClusterName'
      'JobFlowRole': <Instance Profile Role>
      'ServiceRole': <EMR Service Role>
      'ReleaseLabel':
        'Ref': 'EmrRelease'
      'VisibleToAllUsers': !!bool 'true'
      'LogUri':
        'Fn::Sub': <S3 LOG Path>
      'Instances':
        "Ec2KeyName" : <Key Pair Name>
        'TerminationProtected': !!bool 'false'
        'Ec2SubnetId': <subnet-id>
        'MasterInstanceGroup':
          'InstanceCount': !!int '1'
          'InstanceType':
            'Ref': 'ClusterInstanceType'
        'CoreInstanceGroup':
          'InstanceCount': !!int '2'
          'InstanceType':
            'Ref': 'ClusterInstanceType'
          'Market': 'ON_DEMAND'
          'Name': 'Core'
'Outputs':
  'ClusterId':
    'Value':
      'Ref': 'EmrCluster'
    'Description': 'The ID of the  EMR cluster'
'Metadata':
  'AWS::CloudFormation::Designer': {}
'Rules': {}

Trusted identity propagation is supported from Amazon EMR 6.15 onwards. For Amazon EMR 6.15, add the following bootstrap action to the CloudFormation script:

'BootstrapActions':
- 'Name': 'spark-config'
'ScriptBootstrapAction':
'Path': 's3://emr-data-access-control-<aws-region>/customer-bootstrap-actions/idc-fix/replace-puppet.sh'

The portfolio now should have the EMR cluster creation product added.

  • Grant the EMR Studio role emr_tip_role access to the portfolio.

Grant Lake Formation permissions to users to access data

In this step, we enable Lake Formation integration with IAM Identity Center and grant permissions to the Identity Center user analyst1. If Lake Formation is not already enabled, refer to Getting started with Lake Formation.

To use Lake Formation with Amazon EMR, create a custom role to register S3 locations. You need to create a new custom role with Amazon S3 access and not use the default role AWSServiceRoleForLakeFormationDataAccess. Additionally, enable external data filtering in Lake Formation. For more details, refer to Enable Lake Formation with Amazon EMR.

Complete the following steps to manage access permissions in Lake Formation:

  • On the Lake Formation console, choose IAM Identity Center integration under Administration in the navigation pane.

Lake Formation will automatically specify the correct IAM Identity Center instance.

  • Choose Create.

You can now view the IAM Identity Center integration details.

For this post, we have a Marketing database and a customer table on which we grant access to our enterprise user analyst1. You can use an existing database and table in your account or create a new one. For more examples, refer to Tutorials.

The following screenshot shows the details of our customer table.

Complete the following steps to grant analyst1 permissions. For more information, refer to Granting table permissions using the named resource method.

  • On the Lake Formation console, choose Data lake permissions under Permissions in the navigation pane.
  • Choose Grant.

  • Select Named Data Catalog resources.
  • For Databases, choose your database (marketing).
  • For Tables, choose your table (customer).

  • For Table permissions, select Select and Describe.
  • For Data permissions, select All data access.
  • Choose Grant.

The following screenshot shows a summary of permissions that user analyst1 has. They have Select access on the table and Describe permissions on the databases.

Test the solution

To test the solution, we log in to EMR Studio as enterprise user analyst1, create a new Workspace, create an EMR cluster using a template, and use that cluster to perform an analysis. You could also use the Workspace that was created during the Studio setup. In this demonstration, we create a new Workspace.

You need additional permissions in the EMR Studio role to create and list Workspaces, use a template, and create EMR clusters. For more details, refer to Configure EMR Studio user permissions for Amazon EC2 or Amazon EKS. Appendix B at the end of this post contains a sample policy.

When the cluster is available, we attach the cluster to the Workspace and run queries on the customer table, which the user has access to.

User analyst1 is now able to run queries for business use cases using their corporate identity. To open a PySpark notebook, we choose PySpark under Notebook.

When the notebook is open, we run a Spark SQL query to list the databases:

%%sql 
show databases

In this case, we query the customer table in the marketing database. We should be able to access the data.

%%sql
select * from marketing.customer

Audit data access

Lake Formation API actions are logged by CloudTrail. The GetDataAccess action is logged whenever a principal or integrated AWS service requests temporary credentials to access data in a data lake location that is registered with Lake Formation. With trusted identity propagation, CloudTrail also logs the IAM Identity Center user ID of the corporate identity who requested access to the data.

The following screenshot shows the details for the analyst1 user.

Choose View event to view the event logs.

The following is an example of the GetDataAccess event log. We can trace that user analyst1, Identity Center user ID c8c11390-00a1-706e-0c7a-bbcc5a1c9a7f, has accessed the customer table.

{
    "eventVersion": "1.09",
    
….
        "onBehalfOf": {
            "userId": "c8c11390-00a1-706e-0c7a-bbcc5a1c9a7f",
            "identityStoreArn": "arn:aws:identitystore::xxxxxxxxx:identitystore/d-XXXXXXXX"
        }
    },
    "eventTime": "2024-01-28T17:56:25Z",
    "eventSource": "lakeformation.amazonaws.com",
    "eventName": "GetDataAccess",
    "awsRegion": "us-west-2",
….
        "requestParameters": {
        "tableArn": "arn:aws:glue:us-west-2:xxxxxxxxxx:table/marketing/customer",
        "supportedPermissionTypes": [
            "TABLE_PERMISSION"
        ]
    },
    …..
    }
}

Here is an end to end demonstration video of steps to follow for enabling trusted identity propagation to your analytics flow in Amazon EMR

Clean up

Clean up the following resources when you’re done using this solution:

Conclusion

In this post, we demonstrated how to set up and use trusted identity propagation using IAM Identity Center, EMR Studio, and Lake Formation for analytics. With trusted identity propagation, a user’s corporate identity is seamlessly propagated as they access data using single sign-on across AWS analytics services to build analytics applications. Data administrators can provide fine-grained data access directly to corporate users and groups and audit usage. To learn more, see Integrate Amazon EMR with AWS IAM Identity Center.


About the Authors

Pradeep Misra is a Principal Analytics Solutions Architect at AWS. He works across Amazon to architect and design modern distributed analytics and AI/ML platform solutions. He is passionate about solving customer challenges using data, analytics, and AI/ML. Outside of work, Pradeep likes exploring new places, trying new cuisines, and playing board games with his family. He also likes doing science experiments with his daughters.

Deepmala Agarwal works as an AWS Data Specialist Solutions Architect. She is passionate about helping customers build out scalable, distributed, and data-driven solutions on AWS. When not at work, Deepmala likes spending time with family, walking, listening to music, watching movies, and cooking!

Abhilash Nagilla is a Senior Specialist Solutions Architect at Amazon Web Services (AWS), helping public sector customers on their cloud journey with a focus on AWS analytics services. Outside of work, Abhilash enjoys learning new technologies, watching movies, and visiting new places.


Appendix A

Sample Amazon EMR service role and trust policy:

Note: This is a sample service role. Fine grained access control is done using Lake Formation. Modify the permissions as per your enterprise guidance and to comply with your security team.

Trust policy:

{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": "elasticmapreduce.amazonaws.com",
   "AWS": "arn:aws:iam::xxxxxx:role/emr_tip_role"

            },
            "Action": [
                "sts:AssumeRole",
                "sts:SetContext"
            ]
        }
    ]
}

Permission Policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ResourcesToLaunchEC2",
            "Effect": "Allow",
            "Action": [
                "ec2:RunInstances",
                "ec2:CreateFleet",
                "ec2:CreateLaunchTemplate",
                "ec2:CreateLaunchTemplateVersion"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*::image/ami-*",
                "arn:aws:ec2:*:*:key-pair/*",
                "arn:aws:ec2:*:*:capacity-reservation/*",
                "arn:aws:ec2:*:*:placement-group/pg-*",
                "arn:aws:ec2:*:*:fleet/*",
                "arn:aws:ec2:*:*:dedicated-host/*",
                "arn:aws:resource-groups:*:*:group/*"
            ]
        },
        {
            "Sid": "TagOnCreateTaggedEMRResources",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateTags"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:instance/*",
                "arn:aws:ec2:*:*:volume/*",
                "arn:aws:ec2:*:*:launch-template/*"
            ],
            "Condition": {
                "StringEquals": {
                    "ec2:CreateAction": [
                        "RunInstances",
                        "CreateFleet",
                        "CreateLaunchTemplate",
                        "CreateNetworkInterface"
                    ]
                }
            }
        },
        {
            "Sid": "ListActionsForEC2Resources",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeCapacityReservations",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstances",
                "ec2:DescribeLaunchTemplates",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePlacementGroups",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVolumes",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcs"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AutoScaling",
            "Effect": "Allow",
            "Action": [
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:DescribeScalableTargets",
                "application-autoscaling:DescribeScalingPolicies",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:RegisterScalableTarget"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AutoScalingCloudWatch",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DeleteAlarms",
                "cloudwatch:DescribeAlarms"
            ],
            "Resource": "arn:aws:cloudwatch:*:*:alarm:*_EMR_Auto_Scaling"
        },
        {
            "Sid": "PassRoleForAutoScaling",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole",
            "Condition": {
                "StringLike": {
                    "iam:PassedToService": "application-autoscaling.amazonaws.com*"
                }
            }
        },
        {
            "Sid": "PassRoleForEC2",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "arn:aws:iam::xxxxxxxxxxx:role/service-role/<Instance-Profile-Role>",
            "Condition": {
                "StringLike": {
                    "iam:PassedToService": "ec2.amazonaws.com*"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:*",
                "s3-object-lambda:*"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket>/*",
                "arn:aws:s3:::*logs*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateFleet",
                "ec2:CreateLaunchTemplate",
                "ec2:CreateNetworkInterface",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DeleteLaunchTemplate",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteSecurityGroup",
                "ec2:DeleteTags",
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstanceStatus",
                "ec2:DescribeInstances",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeLaunchTemplates",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePrefixLists",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "ec2:DescribeTags",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcEndpointServices",
                "ec2:DescribeVpcs",
                "ec2:DetachNetworkInterface",
                "ec2:ModifyImageAttribute",
                "ec2:ModifyInstanceAttribute",
                "ec2:RequestSpotInstances",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RunInstances",
                "ec2:TerminateInstances",
                "ec2:DeleteVolume",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVolumes",
                "ec2:DetachVolume",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListInstanceProfiles",
                "iam:ListRolePolicies",
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
                "application-autoscaling:RegisterScalableTarget",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:Describe*"
            ]
        }
    ]
}

Appendix B

Sample EMR Studio role policy:

Note: This is a sample service role. Fine grained access control is done using Lake Formation. Modify the permissions as per your enterprise guidance and to comply with your security team.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowEMRReadOnlyActions",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ListInstances",
                "elasticmapreduce:DescribeCluster",
                "elasticmapreduce:ListSteps"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowEC2ENIActionsWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowEC2ENIAttributeAction",
            "Effect": "Allow",
            "Action": [
                "ec2:ModifyNetworkInterfaceAttribute"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:instance/*",
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Sid": "AllowEC2SecurityGroupActionsWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RevokeSecurityGroupIngress",
                "ec2:DeleteNetworkInterfacePermission"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowDefaultEC2SecurityGroupsCreationWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateSecurityGroup"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:security-group/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowDefaultEC2SecurityGroupsCreationInVPCWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateSecurityGroup"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:vpc/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowAddingEMRTagsDuringDefaultSecurityGroupCreation",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:security-group/*",
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true",
                    "ec2:CreateAction": "CreateSecurityGroup"
                }
            }
        },
        {
            "Sid": "AllowEC2ENICreationWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowEC2ENICreationInSubnetAndSecurityGroupWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowAddingTagsDuringEC2ENICreation",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": {
                "StringEquals": {
                    "ec2:CreateAction": "CreateNetworkInterface"
                }
            }
        },
        {
            "Sid": "AllowEC2ReadOnlyActions",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeTags",
                "ec2:DescribeInstances",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowSecretsManagerReadOnlyActionsWithEMRTags",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": "arn:aws:secretsmanager:*:*:secret:*",
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/for-use-with-amazon-emr-managed-policies": "true"
                }
            }
        },
        {
            "Sid": "AllowWorkspaceCollaboration",
            "Effect": "Allow",
            "Action": [
                "iam:GetUser",
                "iam:GetRole",
                "iam:ListUsers",
                "iam:ListRoles",
                "sso:GetManagedApplicationInstance",
                "sso-directory:SearchUsers"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:GetEncryptionConfiguration",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket>",
                "arn:aws:s3:::<bucket>/*"
            ]
        },
        {
            "Sid": "EMRStudioWorkspaceAccess",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:CreateEditor",
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:ListEditors",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:UpdateEditor",
                "elasticmapreduce:PutWorkspaceAccess",
                "elasticmapreduce:DeleteWorkspaceAccess",
                "elasticmapreduce:ListWorkspaceAccessIdentities",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:OpenEditorInConsole",
                "elasticmapreduce:AttachEditor",
                "elasticmapreduce:DetachEditor",
                "elasticmapreduce:ListInstanceGroups",
                "elasticmapreduce:ListBootstrapActions",
                "servicecatalog:SearchProducts",
                "servicecatalog:DescribeProduct",
                "servicecatalog:DescribeProductView",
                "servicecatalog:DescribeProvisioningParameters",
                "servicecatalog:ProvisionProduct",
                "servicecatalog:UpdateProvisionedProduct",
                "servicecatalog:ListProvisioningArtifacts",
                "servicecatalog:DescribeRecord",
                "servicecatalog:ListLaunchPaths",
                "elasticmapreduce:RunJobFlow",      
                "elasticmapreduce:ListClusters",
                "elasticmapreduce:DescribeCluster",
                "codewhisperer:GenerateRecommendations",
                "athena:StartQueryExecution",
                "athena:StopQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetQueryRuntimeStatistics",
                "athena:GetQueryResults",
                "athena:ListQueryExecutions",
                "athena:BatchGetQueryExecution",
                "athena:GetNamedQuery",
                "athena:ListNamedQueries",
                "athena:BatchGetNamedQuery",
                "athena:UpdateNamedQuery",
                "athena:DeleteNamedQuery",
                "athena:ListDataCatalogs",
                "athena:GetDataCatalog",
                "athena:ListDatabases",
                "athena:GetDatabase",
                "athena:ListTableMetadata",
                "athena:GetTableMetadata",
                "athena:ListWorkGroups",
                "athena:GetWorkGroup",
                "athena:CreateNamedQuery",
                "athena:GetPreparedStatement",
                "glue:CreateDatabase",
                "glue:DeleteDatabase",
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:UpdateDatabase",
                "glue:CreateTable",
                "glue:DeleteTable",
                "glue:BatchDeleteTable",
                "glue:UpdateTable",
                "glue:GetTable",
                "glue:GetTables",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition",
                "kms:ListAliases",
                "kms:ListKeys",
                "kms:DescribeKey",
                "lakeformation:GetDataAccess",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutBucketPublicAccessBlock",
                "s3:ListAllMyBuckets",
                "elasticmapreduce:ListStudios",
                "elasticmapreduce:DescribeStudio",
                "cloudformation:GetTemplate",
                "cloudformation:CreateStack",
                "cloudformation:CreateStackSet",
                "cloudformation:DeleteStack",
                "cloudformation:GetTemplateSummary",
                "cloudformation:ValidateTemplate",
                "cloudformation:ListStacks",
                "cloudformation:ListStackSets",
                "elasticmapreduce:AddTags",
                "ec2:CreateNetworkInterface",
                "elasticmapreduce:GetClusterSessionCredentials",
                "elasticmapreduce:GetOnClusterAppUIPresignedURL",
                "cloudformation:DescribeStackResources"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "AllowPassingServiceRoleForWorkspaceCreation",
            "Action": "iam:PassRole",
            "Resource": [
                "arn:aws:iam::*:role/<Studio Role>",
                "arn:aws:iam::*:role/<EMR Service Role>",
                "arn:aws:iam::*:role/<EMR Instance Profile Role>"
            ],
            "Effect": "Allow"
        },
{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Action": [
				"iam:PassRole"
			],
			"Resource": [
				"arn:aws:iam::*:role/<EMR Instance Profile Role>"
			]
		}
    ]
}

Run interactive workloads on Amazon EMR Serverless from Amazon EMR Studio

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/run-interactive-workloads-on-amazon-emr-serverless-from-amazon-emr-studio/

Starting from release 6.14, Amazon EMR Studio supports interactive analytics on Amazon EMR Serverless. You can now use EMR Serverless applications as the compute, in addition to Amazon EMR on EC2 clusters and Amazon EMR on EKS virtual clusters, to run JupyterLab notebooks from EMR Studio Workspaces.

EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug analytics applications written in PySpark, Python, and Scala. EMR Serverless is a serverless option for Amazon EMR that makes it straightforward to run open source big data analytics frameworks such as Apache Spark without configuring, managing, and scaling clusters or servers.

In the post, we demonstrate how to do the following:

  • Create an EMR Serverless endpoint for interactive applications
  • Attach the endpoint to an existing EMR Studio environment
  • Create a notebook and run an interactive application
  • Seamlessly diagnose interactive applications from within EMR Studio

Prerequisites

In a typical organization, an AWS account administrator will set up AWS resources such as AWS Identity and Access management (IAM) roles, Amazon Simple Storage Service (Amazon S3) buckets, and Amazon Virtual Private Cloud (Amazon VPC) resources for internet access and access to other resources in the VPC. They assign EMR Studio administrators who manage setting up EMR Studios and assigning users to a specific EMR Studio. Once they’re assigned, EMR Studio developers can use EMR Studio to develop and monitor workloads.

Make sure you set up resources like your S3 bucket, VPC subnets, and EMR Studio in the same AWS Region.

Complete the following steps to deploy these prerequisites:

  1. Launch the following AWS CloudFormation stack.
    Launch Cloudformation Stack
  2. Enter values for AdminPassword and DevPassword and make a note of the passwords you create.
  3. Choose Next.
  4. Keep the settings as default and choose Next again.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Submit.

We have also provided instructions to deploy these resources manually with sample IAM policies in the GitHub repo.

Set up EMR Studio and a serverless interactive application

After the AWS account administrator completes the prerequisites, the EMR Studio administrator can log in to the AWS Management Console to create an EMR Studio, Workspace, and EMR Serverless application.

Create an EMR Studio and Workspace

The EMR Studio administrator should log in to the console using the emrs-interactive-app-admin-user user credentials. If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Get started.
  3. Select Create and launch EMR Studio.

This creates a Studio with the default name studio_1 and a Workspace with the default name My_First_Workspace. A new browser tab will open for the Studio_1 user interface.

Create and Launch EMR Studio

Create an EMR Serverless application

Complete the following steps to create an EMR Serverless application:

  1. On the EMR Studio console, choose Applications in the navigation pane.
  2. Create a new application.
  3. For Name, enter a name (for example, my-serverless-interactive-application).
  4. For Application setup options, select Use custom settings for interactive workloads.
    Create Serverless Application using custom settings

For interactive applications, as a best practice, we recommend keeping the driver and workers pre-initialized by configuring the pre-initialized capacity at the time of application creation. This effectively creates a warm pool of workers for an application and keeps the resources ready to be consumed, enabling the application to respond in seconds. For further best practices for creating EMR Serverless applications, see Define per-team resource limits for big data workloads using Amazon EMR Serverless.

  1. In the Interactive endpoint section, select Enable Interactive endpoint.
  2. In the Network connections section, choose the VPC, private subnets, and security group you created previously.

If you deployed the CloudFormation stack provided in this post, choose emr-serverless-sg­  as the security group.

A VPC is needed for the workload to be able to access the internet from within the EMR Serverless application in order to download external Python packages. The VPC also allows you to access resources such as Amazon Relational Database Service (Amazon RDS) and Amazon Redshift that are in the VPC from this application. Attaching a serverless application to a VPC can lead to IP exhaustion in the subnet, so make sure there are sufficient IP addresses in your subnet.

  1. Choose Create and start application.

Enable Interactive Endpoints, Choose private subnets and security group

On the applications page, you can verify that the status of your serverless application changes to Started.

  1. Select your application and choose How it works.
  2. Choose View and launch workspaces.
  3. Choose Configure studio.

  1. For Service role¸ provide the EMR Studio service role you created as a prerequisite (emr-studio-service-role).
  2. For Workspace storage, enter the path of the S3 bucket you created as a prerequisite (emrserverless-interactive-blog-<account-id>-<region-name>).
  3. Choose Save changes.

Choose emr-studio-service-role and emrserverless-interactive-blog s3 bucket

14.  Navigate to the Studios console by choosing Studios in the left navigation menu in the EMR Studio section. Note the Studio access URL from the Studios console and provide it to your developers to run their Spark applications.

Run your first Spark application

After the EMR Studio administrator has created the Studio, Workspace, and serverless application, the Studio user can use the Workspace and application to develop and monitor Spark workloads.

Launch the Workspace and attach the serverless application

Complete the following steps:

  1. Using the Studio URL provided by the EMR Studio administrator, log in using the emrs-interactive-app-dev-user user credentials shared by the AWS account admin.

If you deployed the prerequisite resources using the provided CloudFormation template, use the password that you provided as an input parameter.

On the Workspaces page, you can check the status of your Workspace. When the Workspace is launched, you will see the status change to Ready.

  1. Launch the workspace by choosing the workspace name (My_First_Workspace).

This will open a new tab. Make sure your browser allows pop-ups.

  1. In the Workspace, choose Compute (cluster icon) in the navigation pane.
  2. For EMR Serverless application, choose your application (my-serverless-interactive-application).
  3. For Interactive runtime role, choose an interactive runtime role (for this post, we use emr-serverless-runtime-role).
  4. Choose Attach to attach the serverless application as the compute type for all the notebooks in this Workspace.

Choose my-serverless-interactive-application as your app and emr-serverless-runtime-role and attach

Run your Spark application interactively

Complete the following steps:

  1. Choose the Notebook samples (three dots icon) in the navigation pane and open Getting-started-with-emr-serverless notebook.
  2. Choose Save to Workspace.

There are three choices of kernels for our notebook: Python 3, PySpark, and Spark (for Scala).

  1. When prompted, choose PySpark as the kernel.
  2. Choose Select.

Choose PySpark as kernel

Now you can run your Spark application. To do so, use the %%configure Sparkmagic command, which configures the session creation parameters. Interactive applications support Python virtual environments. We use a custom environment in the worker nodes by specifying a path for a different Python runtime for the executor environment using spark.executorEnv.PYSPARK_PYTHON. See the following code:

%%configure -f
{
  "conf": {
    "spark.pyspark.virtualenv.enabled": "true",
    "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv",
    "spark.pyspark.virtualenv.type": "native",
    "spark.pyspark.python": "/usr/bin/python3",
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/bin/python3"
  }
}

Install external packages

Now that you have an independent virtual environment for the workers, EMR Studio notebooks allow you to install external packages from within the serverless application by using the Spark install_pypi_package function through the Spark context. Using this function makes the package available for all the EMR Serverless workers.

First, install matplotlib, a Python package, from PyPi:

sc.install_pypi_package("matplotlib")

If the preceding step doesn’t respond, check your VPC setup and make sure it is configured correctly for internet access.

Now you can use a dataset and visualize your data.

Create visualizations

To create visualizations, we use a public dataset on NYC yellow taxis:

file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet"
taxi_df = (spark.read.format("parquet").option("header", "true") \
.option("inferSchema", "true").load(file_name))

In the preceding code block, you read the Parquet file from a public bucket in Amazon S3. The file has headers, and we want Spark to infer the schema. You then use a Spark dataframe to group and count specific columns from taxi_df:

taxi1_df = taxi_df.groupBy("VendorID", "passenger_count").count()
taxi1_df.show()

Use %%display magic to view the result in table format:

%%display
taxi1_df

Table shows vendor_id, passenger_count and count columns

You can also quickly visualize your data with five types of charts. You can choose the display type and the chart will change accordingly. In the following screenshot, we use a bar chart to visualize our data.

bar chart showing passenger_count against each vendor_id

Interact with EMR Serverless using Spark SQL

You can interact with tables in the AWS Glue Data Catalog using Spark SQL on EMR Serverless. In the sample notebook, we show how you can transform data using a Spark dataframe.

First, create a new temporary view called taxis. This allows you to use Spark SQL to select data from this view. Then create a taxi dataframe for further processing:

taxi_df.createOrReplaceTempView("taxis")
sqlDF = spark.sql(
    "SELECT DOLocationID, sum(total_amount) as sum_total_amount \
     FROM taxis where DOLocationID < 25 Group by DOLocationID ORDER BY DOLocationID"
)
sqlDF.show(5)

Table shows vendor_id, passenger_count and count columns

In each cell in your EMR Studio notebook, you can expand Spark Job Progress to view the various stages of the job submitted to EMR Serverless while running this specific cell. You can see the time taken to complete each stage. In the following example, stage 14 of the job has 12 completed tasks. In addition, if there is any failure, you can see the logs, making troubleshooting a seamless experience. We discuss this more in the next section.

Job[14]: showString at NativeMethodAccessorImpl.java:0 and Job[15]: showString at NativeMethodAccessorImpl.java:0

Use the following code to visualize the processed dataframe using the matplotlib package. You use the maptplotlib library to plot the dropoff location and the total amount as a bar chart.

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
plt.clf()
df = sqlDF.toPandas()
plt.bar(df.DOLocationID, df.sum_total_amount)
%matplot plt

Diagnose interactive applications

You can get the session information for your Livy endpoint using the %%info Sparkmagic. This gives you links to access the Spark UI as well as the driver log right in your notebook.

The following screenshot is a driver log snippet for our application, which we opened via the link in our notebook.

driver log screenshot

Similarly, you can choose the link below Spark UI to open the UI. The following screenshot shows the Executors tab, which provides access to the driver and executor logs.

The following screenshot shows stage 14, which corresponds to the Spark SQL step we saw earlier in which we calculated the location wise sum of total taxi collections, which had been broken down into 12 tasks. Through the Spark UI, the interactive application provides fine-grained task-level status, I/O, and shuffle details, as well as links to corresponding logs for each task for this stage right from your notebook, enabling a seamless troubleshooting experience.

Clean up

If you no longer want to keep the resources created in this post, complete the following cleanup steps:

  1. Delete the EMR Serverless application.
  2. Delete the EMR Studio and the associated workspaces and notebooks.
  3. To delete rest of the resources, navigate to CloudFormation console, select the stack, and choose Delete.

All of the resources will be deleted except the S3 bucket, which has its deletion policy set to retain.

Conclusion

The post showed how to run interactive PySpark workloads in EMR Studio using EMR Serverless as the compute. You can also build and monitor Spark applications in an interactive JupyterLab Workspace.

In an upcoming post, we’ll discuss additional capabilities of EMR Serverless Interactive applications, such as:

  • Working with resources such as Amazon RDS and Amazon Redshift in your VPC (for example, for JDBC/ODBC connectivity)
  • Running transactional workloads using serverless endpoints

If this is your first time exploring EMR Studio, we recommend checking out the Amazon EMR workshops and referring to Create an EMR Studio.


About the Authors

Sekar Srinivasan is a Principal Specialist Solutions Architect at AWS focused on Data Analytics and AI. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, focused on underprivileged Children’s education.

Disha Umarwani is a Sr. Data Architect with Amazon Professional Services within Global Health Care and LifeSciences. She has worked with customers to design, architect and implement Data Strategy at scale. She specializes in architecting Data Mesh architectures for Enterprise platforms.

Automate large-scale data validation using Amazon EMR and Apache Griffin

Post Syndicated from Dipal Mahajan original https://aws.amazon.com/blogs/big-data/automate-large-scale-data-validation-using-amazon-emr-and-apache-griffin/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from source to target. This data validation is a critical step, and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using a configuration-based tool using Amazon EMR and the Apache Griffin open source library. Griffin is an open source data quality solution for big data, which supports both batch and streaming mode.

In today’s data-driven landscape, where organizations deal with petabytes of data, the need for automated data validation frameworks has become increasingly critical. Manual validation processes are not only time-consuming but also prone to errors, especially when dealing with vast volumes of data. Automated data validation frameworks offer a streamlined solution by efficiently comparing large datasets, identifying discrepancies, and ensuring data accuracy at scale. With such frameworks, organizations can save valuable time and resources while maintaining confidence in the integrity of their data, thereby enabling informed decision-making and enhancing overall operational efficiency.

The following are standout features for this framework:

  • Utilizes a configuration-driven framework
  • Offers plug-and-play functionality for seamless integration
  • Conducts count comparison to identify any disparities
  • Implements robust data validation procedures
  • Ensures data quality through systematic checks
  • Provides access to a file containing mismatched records for in-depth analysis
  • Generates comprehensive reports for insights and tracking purposes

Solution overview

This solution uses the following services:

  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the source and target.
  • Amazon EMR to run the PySpark script. We use a Python wrapper on top of Griffin to validate data between Hadoop tables created over HDFS or Amazon S3.
  • AWS Glue to catalog the technical table, which stores the results of the Griffin job.
  • Amazon Athena to query the output table to verify the results.

We use tables that store the count for each source and target table and also create files that show the difference of records between source and target.

The following diagram illustrates the solution architecture.

Architecture_Diagram

In the depicted architecture and our typical data lake use case, our data either resides n Amazon S3 or is migrated from on premises to Amazon S3 using replication tools such as AWS DataSync or AWS Database Migration Service (AWS DMS). Although this solution is designed to seamlessly interact with both Hive Metastore and the AWS Glue Data Catalog, we use the Data Catalog as our example in this post.

This framework operates within Amazon EMR, automatically running scheduled tasks on a daily basis, as per the defined frequency. It generates and publishes reports in Amazon S3, which are then accessible via Athena. A notable feature of this framework is its capability to detect count mismatches and data discrepancies, in addition to generating a file in Amazon S3 containing full records that didn’t match, facilitating further analysis.

In this example, we use three tables in an on-premises database to validate between source and target : balance_sheet, covid, and survery_financial_report.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Deploy the solution

To make it straightforward for you to get started, we have created a CloudFormation template that automatically configures and deploys the solution for you. Complete the following steps:

  1. Create an S3 bucket in your AWS account called bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region} (provide your AWS account ID and AWS Region).
  2. Unzip the following file to your local system.
  3. After unzipping the file to your local system, change <bucket name> to the one you created in your account (bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}) in the following files:
    1. bootstrap-bdb-3070-datavalidation.sh
    2. Validation_Metrics_Athena_tables.hql
    3. datavalidation/totalcount/totalcount_input.txt
    4. datavalidation/accuracy/accuracy_input.txt
  4. Upload all the folders and files in your local folder to your S3 bucket:
    aws s3 cp . s3://<bucket_name>/ --recursive

  5. Run the following CloudFormation template in your account.

The CloudFormation template creates a database called griffin_datavalidation_blog and an AWS Glue crawler called griffin_data_validation_blog on top of the data folder in the .zip file.

  1. Choose Next.
    Cloudformation_template_1
  2. Choose Next again.
  3. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs
  1. Run the AWS Glue crawler and verify that six tables have been created in the Data Catalog.
  2. Run the following CloudFormation template in your account.

This template creates an EMR cluster with a bootstrap script to copy Griffin-related JARs and artifacts. It also runs three EMR steps:

  • Create two Athena tables and two Athena views to see the validation matrix produced by the Griffin framework
  • Run count validation for all three tables to compare the source and target table
  • Run record-level and column-level validations for all three tables to compare between the source and target table
  1. For SubnetID, enter your subnet ID.
  2. Choose Next.
    Cloudformation_template_2
  3. Choose Next again.
  4. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.

You can view the stack outputs on the console or by using the following AWS CLI command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 5 minutes for the deployment to complete. When the stack is complete, you should see the EMRCluster resource launched and available in your account.

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • Bootstrap action – It installs the Griffin JAR file and directories for this framework. It also downloads sample data files to use in the next step.
  • Athena_Table_Creation – It creates tables in Athena to read the result reports.
  • Count_Validation – It runs the job to compare the data count between source and target data from the Data Catalog table and stores the results in an S3 bucket, which will be read via an Athena table.
  • Accuracy – It runs the job to compare the data rows between the source and target data from the Data Catalog table and store the results in an S3 bucket, which will be read via the Athena table.

Athena_table

When the EMR steps are complete, your table comparison is done and ready to view in Athena automatically. No manual intervention is needed for validation.

Validate data with Python Griffin

When your EMR cluster is ready and all the jobs are complete, it means the count validation and data validation are complete. The results have been stored in Amazon S3 and the Athena table is already created on top of that. You can query the Athena tables to view the results, as shown in the following screenshot.

The following screenshot shows the count results for all tables.

Summary_table

The following screenshot shows the data accuracy results for all tables.

Detailed_view

The following screenshot shows the files created for each table with mismatched records. Individual folders are generated for each table directly from the job.

mismatched_records

Every table folder contains a directory for each day the job is run.

S3_path_mismatched

Within that specific date, a file named __missRecords contains records that do not match.

S3_path_mismatched_2

The following screenshot shows the contents of the __missRecords file.

__missRecords

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database griffin_datavalidation_blog and drop the database griffin_datavalidation_blog cascade.
  2. Delete the prefixes and objects you created from the bucket bdb-3070-griffin-datavalidation-blog-${AWS::AccountId}-${AWS::Region}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Conclusion

This post showed how you can use Python Griffin to accelerate the post-migration data validation process. Python Griffin helps you calculate count and row- and column-level validation, identifying mismatched records without writing any code.

For more information about data quality use cases, refer to Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog and AWS Glue Data Quality.


About the Authors

Dipal Mahajan serves as a Lead Consultant at Amazon Web Services, providing expert guidance to global clients in developing highly secure, scalable, reliable, and cost-efficient cloud applications. With a wealth of experience in software development, architecture, and analytics across diverse sectors such as finance, telecom, retail, and healthcare, he brings invaluable insights to his role. Beyond the professional sphere, Dipal enjoys exploring new destinations, having already visited 14 out of 30 countries on his wish list.

Akhil is a Lead Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.

Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.