All posts by Anubhav Awasthi

Run Apache Spark Structured Streaming jobs at scale on Amazon EMR Serverless

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/run-apache-spark-structured-streaming-jobs-at-scale-on-amazon-emr-serverless/

As data is generated at an unprecedented rate, streaming solutions have become essential for businesses seeking to harness near real-time insights. Streaming data—from social media feeds, IoT devices, e-commerce transactions, and more—requires robust platforms that can process and analyze data as it arrives, enabling immediate decision-making and actions.

This is where Apache Spark Structured Streaming comes into play. It offers a high-level API that simplifies the complexities of streaming data, allowing developers to write streaming jobs as if they were batch jobs, but with the power to process data in near real time. Spark Structured Streaming integrates seamlessly with various data sources, such as Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Kinesis Data Streams, providing a unified solution that supports complex operations like windowed computations, event-time aggregation, and stateful processing. By using Spark’s fast, in-memory processing capabilities, businesses can run streaming workloads efficiently, scaling up or down as needed, to derive timely insights that drive strategic and critical decisions.

The setup of a computing infrastructure to support such streaming workloads poses its challenges. Here, Amazon EMR Serverless emerges as a pivotal solution for running streaming workloads, enabling the use of the latest open source frameworks like Spark without the need for configuration, optimization, security, or cluster management.

Starting with Amazon EMR 7.1, we introduced a new job --mode on EMR Serverless called Streaming. You can submit a streaming job from the EMR Studio console or the StartJobRun API:

aws emr-serverless start-job-run \
 --application-id APPPLICATION_ID \
 --execution-role-arn JOB_EXECUTION_ROLE \
 --mode 'STREAMING' \
 --job-driver '{
     "sparkSubmit": {
         "entryPoint": "s3://streaming script",
         "entryPointArguments": ["s3://DOC-EXAMPLE-BUCKET-OUTPUT/output"],
         "sparkSubmitParameters": "--conf spark.executor.cores=4
            --conf spark.executor.memory=16g
            --conf spark.driver.cores=4
            --conf spark.driver.memory=16g
            --conf spark.executor.instances=3"
     }
 }'

In this post, we highlight some of the key enhancements introduced for streaming jobs.

Performance

The Amazon EMR runtime for Apache Spark delivers a high-performance runtime environment while maintaining 100% API compatibility with open source Spark. Additionally, we have introduced the following enhancements to provide improved support for streaming jobs.

Amazon Kinesis connector with Enhanced Fan-Out Support

Traditional Spark streaming applications reading from Kinesis Data Streams often face throughput limitations due to shared shard-level read capacity, where multiple consumers compete for the default 2 MBps per shard throughput. This bottleneck becomes particularly challenging in scenarios requiring real-time processing across multiple consuming applications.

To address this challenge, we released the open source Amazon Kinesis Data Streams Connector for Spark Structured Streaming that supports enhanced fan-out for dedicated read throughput. Compatible with both provisioned and on-demand Kinesis Data Streams, enhanced fan-out provides each consumer with dedicated throughput of 2 MBps per shard. This enables streaming jobs to process data concurrently without the constraints of shared throughput, significantly reducing latency and facilitating near real-time processing of large data streams. By eliminating competition between consumers and enhancing parallelism, enhanced fan-out provides faster, more efficient data processing, which boosts the overall performance of streaming jobs on EMR Serverless. Starting with Amazon EMR 7.1, the connector comes pre-packaged on EMR Serverless, so you don’t need to build or download any packages.

The following diagram illustrates the architecture using shared throughput.

The following diagram illustrates the architecture using enhanced fan-out and dedicated throughput.

Refer to Build Spark Structured Streaming applications with the open source connector for Amazon Kinesis Data Streams for additional details on this connector.

Cost optimization

EMR Serverless charges are based on the total vCPU, memory, and storage resources utilized during the time workers are active, from when they are ready to execute tasks until they stop. To optimize costs, it is crucial to scale streaming jobs effectively. We have introduced the following enhancements to improve scaling at both the task level and across multiple tasks.

Fine-Grained Scaling

In practical scenarios, data volumes can be unpredictable and exhibit sudden spikes, necessitating a platform capable of dynamically adjusting to workload changes. EMR Serverless eliminates the risks of over- or under-provisioning resources for your streaming workloads. EMR Serverless scaling uses Spark dynamic allocation to correctly scale the executors according to demand. The scalability of a streaming job is also influenced by its data source to make sure Kinesis shards or Kafka partitions are also scaled accordingly. Each Kinesis shard and Kafka partition corresponds to a single Spark executor core. To achieve optimal throughput, use a one-to-one ratio of Spark executor cores to Kinesis shards or Kafka partitions.

Streaming operates through a sequence of micro-batch processes. In cases of short-running tasks, overly aggressive scaling can lead to resource wastage due to the overhead of allocating executors. To mitigate this, consider modifying spark.dynamicAllocation.executorAllocationRatio. The scaling down process is shuffle aware, avoiding executors holding shuffle data. Although this shuffle data is usually subject to garbage collection, if it’s not being cleared fast enough, the spark.dynamicAllocation.shuffleTracking.timeout setting can be adjusted to determine when executors should be timed out and removed.

Let’s examine fine-grained scaling with an example of a spiky workload where data is periodically ingested, followed by idle intervals. The following graph illustrates an EMR Serverless streaming job processing data from an on-demand Kinesis data stream. Initially, the job handles 100 records per second. As tasks queue, dynamic allocation adds capacity, which is quickly released due to short task durations (adjustable using executorAllocationRatio). When we increase input data to 10,000 records per second, Kinesis adds shards, triggering EMR Serverless to provision more executors. Scaling down happens as executors complete processing and are released after the idle timeout (spark.dynamicAllocation.executorIdleTimeout, default 60 seconds), leaving only the Spark driver running during the idle window. (Full scale-down is source dependent. For example, a provisioned Kinesis data stream with a fixed number of shards may have limitations in fully scaling down even when shards are idle.) This pattern repeats as bursts of 10,000 records per second alternate with idle periods, allowing EMR Serverless to scale resources dynamically. This job uses the following configuration:

--conf spark.dynamicAllocation.shuffleTracking.timeout=300s
--conf spark.dynamicAllocation.executorAllocationRatio=0.7

Resiliency

EMR Serverless ensures resiliency in streaming jobs by leveraging automatic recovery and fault-tolerant architectures

Built-in Availability Zone resiliency

Streaming applications drive critical business operations like fraud detection, real-time analytics, and monitoring systems, making any downtime particularly costly. Infrastructure failures at the Availability Zone level can cause significant disruptions to distributed streaming applications, potentially leading to extended downtime and data processing delays.

Amazon EMR Serverless now addresses this challenge with built-in Availability Zone failover capabilities: jobs are initially provisioned in a randomly selected Availability Zone, and, in the event of an Availability Zone failure, the service automatically retries the job in a healthy Availability Zone, minimizing interruptions to processing. Although this feature greatly enhances application reliability, achieving full resiliency requires input data sources that also support Availability Zone failover. Additionally, if you’re using a custom virtual private cloud (VPC) configuration, it is recommended to configure EMR Serverless to operate across multiple Availability Zones to optimize fault tolerance.

The following diagram illustrates a sample architecture.

Auto retry

Streaming applications are susceptible to various runtime failures caused by transient issues such as network connectivity problems, memory pressure, or resource constraints. Without proper retry mechanisms, these temporary failures can lead to permanently stopping jobs, requiring manual intervention to restart the jobs. This not only increases operational overhead but also risks data loss and processing gaps, especially in continuous data processing scenarios where maintaining data consistency is crucial.

EMR Serverless streamlines this process by automatically retrying failed jobs. Streaming jobs use checkpointing to periodically save the computation state to Amazon Simple Storage Service (Amazon S3), allowing failed jobs to restart from the last checkpoint, minimizing data loss and reprocessing time. Although there is no cap on the total number of retries, a thrash prevention mechanism allows you to configure the number of retry attempts per hour, ranging from 1–10, with the default being set to five attempts per hour.

See the following example code:

aws emr-serverless start-job-run \
 --application-id <APPPLICATION_ID>  \
 --execution-role-arn <JOB_EXECUTION_ROLE> \
 --mode 'STREAMING' \
 --retry-policy '{
    "maxFailedAttemptsPerHour": 5
 }'
 --job-driver '{
    "sparkSubmit": {
         "entryPoint": "/usr/lib/spark/examples/jars/spark-examples-does-not-exist.jar",
         "entryPointArguments": ["1"],
         "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi"
    }
 }'

Observability

EMR Serverless provides robust log management and enhanced monitoring, enabling users to efficiently troubleshoot issues and optimize the performance of streaming jobs.

Event log rotation and compression

Spark streaming applications continuously process data and generate substantial amounts of event log data. The accumulation of these logs can consume significant disk space, potentially leading to degraded performance or even system failures due to disk space exhaustion.

Log rotation mitigates these risks by periodically archiving old logs and creating new ones, thereby maintaining a manageable size of active log files. Event log rotation is enabled by default for both batch as well as streaming jobs and can’t be disabled. Rotating logs doesn’t affect the logs uploaded to the S3 bucket. However, they will be compressed using zstd standard. You can find rotated event logs under the following S3 folder:

<S3-logUri>/applications/<application-id>/jobs/<job-id>/sparklogs/

The following table summarizes key configurations that govern event log rotation.

Configuration Value Comment
spark.eventLog.rotation.enabled TRUE
spark.eventLog.rotation.interval 300 seconds Specifies time interval for the log rotation
spark.eventLog.rotation.maxFilesToRetain 2 Specifies how many rotated log files to keep during cleanup
spark.eventLog.rotation.minFileSize 1 MB Specifies a minimum file size to rotate the log file

Application log rotation and compression

One of the most common errors in Spark streaming applications is the no space left on disk errors, primarily caused by the rapid accumulation of application logs during continuous data processing. These Spark streaming application logs from drivers and executors can grow exponentially, quickly consuming available disk space.

To address this, EMR Serverless has introduced rotation and compression for driver and executor stderr and stdout logs. Log files are refreshed every 15 seconds and can range from 0–128 MB. You can find the latest log files at the following Amazon S3 locations:

<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_DRIVER/stderr.gz
<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_DRIVER/stdout.gz
<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_EXECUTOR/stderr.gz
<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_EXECUTOR/stdout.gz

Rotated application logs are pushed to archive available under the following Amazon S3 locations:

<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_DRIVER/archived/
<S3-logUri>/applications/<application-id>/jobs/<job-id>/SPARK_EXECUTOR/<executor-id>/archived/

Enhanced monitoring

Spark provides comprehensive performance metrics for drivers and executors, including JVM heap memory, garbage collection, and shuffle data, which are valuable for troubleshooting performance and analyzing workloads. Starting with Amazon EMR 7.1, EMR Serverless integrates with Amazon Managed Service for Prometheus, enabling you to monitor, analyze, and optimize your jobs using detailed engine metrics, such as Spark event timelines, stages, tasks, and executors. This integration is available when submitting jobs or creating applications. For setup details, refer to Monitor Spark metrics with Amazon Managed Service for Prometheus. To enable metrics for Structured Streaming queries, set the Spark property --conf spark.sql.streaming.metricsEnabled=true

You can also monitor and debug jobs using the Spark UI. The web UI presents a visual interface with detailed information about your running and completed jobs. You can dive into job-specific metrics and information about event timelines, stages, tasks, and executors for each job.

Service integrations

Organizations often struggle with integrating multiple streaming data sources into their data processing pipelines. Managing different connectors, dealing with varying protocols, and providing compatibility across diverse streaming platforms can be complex and time-consuming.

EMR Serverless supports Kinesis Data Streams, Amazon MSK, and self-managed Apache Kafka clusters as input data sources to read and process data in near real time.

Whereas the Kinesis Data Streams connector is natively available on Amazon EMR, the Kafka connector is an open source connector from the Spark community and is available in a Maven repository.

The following diagram illustrates a sample architecture for each connector.

Refer to Supported streaming connectors to learn more about using these connectors.

Additionally, you can refer to the aws-samples GitHub repo to set up a streaming job reading data from a Kinesis data stream. It uses the Amazon Kinesis Data Generator to generate test data.

Conclusion

Running Spark Structured Streaming on EMR Serverless offers a robust and scalable solution for real-time data processing. By taking advantage of the seamless integration with AWS services like Kinesis Data Streams, you can efficiently handle streaming data with ease. The platform’s advanced monitoring tools and automated resiliency features provide high availability and reliability, minimizing downtime and data loss. Furthermore, the performance optimizations and cost-effective serverless model make it an ideal choice for organizations looking to harness the power of near real-time analytics without the complexities of managing infrastructure.

Try out using Spark Structured Streaming on EMR Serverless for your own use case, and share your questions in the comments.


About the Authors

AAAnubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Kshitija Dound is an Associate Specialist Solutions Architect at AWS based in New York City, focusing on data and AI. She collaborates with customers to transform their ideas into cloud solutions, using AWS big data and AI services. In her spare time, Kshitija enjoys exploring museums, indulging in art, and embracing NYC’s outdoor scene.

Paul Min is a Solutions Architect at AWS, where he works with customers to advance their mission and accelerate their cloud adoption. He is passionate about helping customers reimagine what’s possible with AWS. Outside of work, Paul enjoys spending time with his wife and golfing.

Fine-grained access control in Amazon EMR Serverless with AWS Lake Formation

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/fine-grained-access-control-in-amazon-emr-serverless-with-aws-lake-formation/

In today’s data-driven world , enterprises are increasingly reliant on vast amounts of data to drive decision-making and innovation. With this reliance comes the critical need for robust data security and access control mechanisms. Fine-grained access control restricts access to specific data subsets, protecting sensitive information and maintaining regulatory compliance. It allows organizations to set detailed permissions at various levels, including database, table, column, and row. This precise control mitigates risks of unauthorized access, data leaks, and misuse. In the unfortunate event of a security incident, fine-grained access control helps limit the scope of the breach, minimizing potential damage.
AWS is introducing general availability of fine-grained access control based on AWS Lake Formation for Amazon EMR Serverless on Amazon EMR 7.2. Enterprises can now significantly enhance their data governance and security frameworks. This new integration supports the implementation of modern data lake architectures, such as data mesh, by providing a seamless way to manage and analyze data. You can use EMR Serverless to enforce data access controls using Lake Formation when reading data from Amazon Simple Storage Service (Amazon S3), enabling robust data processing workflows and real-time analytics without the overhead of cluster management.

In this post, we discuss how to implement fine-grained access control in EMR Serverless using Lake Formation. With this integration, organizations can achieve better scalability, flexibility, and cost-efficiency in their data operations, ultimately driving more value from their data assets.

Key use cases for fine-grained access control in analytics

The following are key use cases for fine-grained access control in analytics:

  • Customer 360 – You can enable different departments to securely access specific customer data relevant to their functions. For example, the sales team can be granted access only to data such as customer purchase history, preferences, and transaction patterns. Meanwhile, the marketing team is limited to viewing campaign interactions, customer demographics, and engagement metrics.
  • Financial reporting – You can enable financial analysts to access the necessary data for reporting and analysis while restricting sensitive financial details to authorized executives.
  • Healthcare analytics – You can enable healthcare researchers and data scientists to analyze de-identified patient data for medical advancements and research, while making sure Protected Health Information (PHI) remains confidential and accessible only to authorized healthcare professionals and personnel.
  • Supply chain optimization – You can grant logistics teams visibility into inventory and shipment data while limiting access to pricing or supplier information to relevant stakeholders.

Solution overview

In this post, we explore how to implement fine-grained access control on Iceberg tables within an EMR Serverless application, using the capabilities of Lake Formation. If you’re interested in learning how to implement fine-grained access control on open table formats in Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) instances using Lake Formation, refer to Enforce fine-grained access control on Open Table Formats via Amazon EMR integrated with AWS Lake Formation.
With the data access control features available in Lake Formation, you can enforce granular permissions and govern access to specific columns, rows, or cells within your Iceberg tables. This approach makes sure sensitive data remains secure and accessible only to authorized users or applications, aligning with your organization’s data governance policies and regulatory compliance requirements.

A cross-account modern data platform on AWS involves setting up a centralized data lake in a primary AWS account, while allowing controlled access to this data from secondary AWS accounts. This setup helps organizations maintain a single source of truth for their data, provides consistent data governance, and uses the robust security features of AWS across multiple business units or project teams.

To demonstrate how you can use Lake Formation to implement cross account fine-grained access control within an EMR Serverless environment, we use the TPC-DS dataset to create tables in the AWS Glue Data Catalog in the AWS producer account and provision different user personas to reflect various roles and access levels in the AWS consumer account, forming a secure and governed data lake.

The following diagram illustrates the solution architecture.

The producer account contains the following persona:

  • Data engineer – Tasks include data preparation, bulk updates, and incremental updates. The data engineer has the following access:
    • Table-level access – Full read/write access to all TPC-DS tables.

The consumer account contains the following personas:

  • Finance analyst – We run a sample query that performs a sales data analysis to guide marketing, inventory, and promotion strategies based on demographic and geographic performance. The finance analyst has the following access:
    • Table-level access – Full access to tables store_sales, catalog_sales, web_sales, item, and promotion for comprehensive financial analysis.
    • Column-level access – Limited access to cost-related columns in the sales tables to avoid exposure to sensitive pricing strategies. Limited access to sensitive columns like credit_rating in the customer_demographics table.
    • Row-level access – Access only to sales data from the current fiscal year or specific promotional periods.
  • Product analyst – We run a sample query that does a customer behavior analysis to tailor marketing, promotions, and loyalty programs based on purchase patterns and regional insights. The product analyst has the following access:
    • Table-level access – Full access to tables item, store_sales, and customer tables to evaluate product and market trends.
    • Column-level access – Restricted access to personal identifiers in the customer table, such as customer_address , email_address, and date of birth.

Prerequisites

You should have the following prerequisites:

Set up infrastructure in the producer account

We provide a CloudFormation template to deploy the data lake stack with the following resources:

  • Two S3 buckets: one for scripts and query results, and one for the data lake storage
  • An Amazon Athena workgroup
  • An EMR Serverless application
  • An AWS Glue database and tables on external public S3 buckets of TPC-DS data
  • An AWS Glue database for the data lake
  • An IAM role and polices

Set up Lake Formation for the data engineer in the producer account

Set up Lake Formation cross-account data sharing version settings:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. Under Data Catalog settings, pick Version 4 under Cross-account version settings.

To learn more about the differences between data sharing versions, refer to Updating cross-account data sharing version settings. Make sure Default permissions for newly created databases and tables is unchecked.

Register the Amazon S3 location as the data lake location

When you register an Amazon S3 location with Lake Formation, you specify an IAM role with read/write permissions on that location. After registering, when EMR Serverless requests access to this Amazon S3 location, Lake Formation will supply temporary credentials of the provided role to access the data. We already created the role LakeFormationServiceRole using the CloudFormation template. To register the Amazon S3 location as the data lake location, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. In the navigation pane, choose Data lake locations under Administration.
  3. Choose Register location.
  4. For Amazon S3 path, enter s3://<DatalakeBucketName>. (Copy the bucket name from the CloudFormation stack’s Outputs tab.)
  5. For IAM role, enter LakeFormationServiceRoleDatalake.
  6. For Permission mode, select Lake Formation.
  7. Choose Register location.

Generate TPC-DS tables in the producer account

In this section, we generate TPC-DS tables in Iceberg format in the producer account.
Grant database permissions to the data engineer
First, let’s grant database permissions to the data engineer IAM role Amazon-EMR-ExecutionRole_DE that we will use with EMR Serverless. Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. Choose Databases and Create database.
  3. Enter iceberg_db for Name and s3://<DatalakeBucketName> for Location.
  4. Choose Create database.
  5. In the navigation pane, choose Data lake permissions and choose Grant.
  6. In the Principles section, select IAM users and roles and choose Amazon-EMR-ExecutionRole_DE.
  7. In the LF-Tags or catalog resources section, select Named Data Catalog resources and choose tpc-source and iceberg_db for Databases.
  8. Select Super for both Database permissions and Grantable permissions and choose Grant.

Create an EMR Serverless application

Now, let’s log in to EMR Serverless using Amazon EMR Studio and complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless.
  2. Under Manage applications, choose my-emr-studio. You will be directed to the Create application page on EMR Studio. Let’s create a Lake Formation enabled EMR Serverless application
  3.  Under Application settings, provide the following information:
    1. For Name, enter a name emr-fgac-application.
    2. For Type, choose Spark.
    3. For Release version, choose emr-7.2.0.
    4. For Architecture, choose x86_64.
  4. Under Application setup options, select Use custom settings.
  5. Under Interactive endpoint, select Enable endpoint for EMR studio
  6. Under Additional configurations, for Metastore configuration, select Use AWS Glue Data Catalog as metastore, then select Use Lake Formation for fine-grained access control.
  7. Under Network connections, choose emrs-vpc for the VPC, enter any two private subnets, and enter emr-serverless-sg for Security groups.
  8. Choose Create and start application.

Create a Workspace

Complete the following steps to create an EMR Workspace:

  1. On the Amazon EMR console, choose Workspaces in the navigation pane and choose Create Workspace.
  2. Enter the Workspace name emr-fgac-workspace.
  3. Leave all other settings as default and choose Create Workspace.
  4. Choose Launch Workspace. Your browser might request to allow pop-up permissions for the first time launching the Workspace.
  5. After the Workspace is launched, in the navigation pane, choose Compute.
  6. For Compute type¸ select EMR Serverless application and enter emr-fgac-application for the application and Amazon-EMR-ExecutionRole_DE as the runtime role.
  7. Make sure the kernel attached to the Workspace is PySpark.
  8. Navigate to the File browser section and choose Upload files.
  9. Upload the file Iceberg-ingest-final_v2.ipynb.
  10. Update the data lake bucket name, AWS account ID, and AWS Region accordingly.
  11. Choose the double arrow icon to restart the kernel and rerun the notebook.


To verify that the data is generated, you can go to the AWS Glue console. Under Data Catalog, Databases, you should see TPC-DS tables ending with _iceberg for the database iceberg_db.

Share the database and TPC-DS tables to the consumer account

We now grant permissions to the consumer account, including grantable permissions. This allows the Lake Formation data lake administrator in the consumer account to control access to the data within the account.

Grant database permissions to the consumer account

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the producer account.
  2. In the navigation pane, choose Databases.
  3. Select the database iceberg_db, and on the Actions menu, under Permissions, choose Grant.
  4. In the Principles section, select External accounts and enter the consumer account.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and choose iceberg_db for Databases.
  6. In the Database permissions section, select Describe for both Database permissions and Grantable permissions.

This allows the data lake administrator in the consumer account to describe the database and grant describe permissions to other principals in the consumer account.

Grant table permissions to the consumer account

Repeat the preceding steps to grant table permissions to the consumer account.

Choose All tables under Tables and provide select and describe permissions for Table permissions and Grantable permissions.

Set up Lake Formation in the consumer account

For the remaining section of the post, we focus on the consumer account. Deploy the following CloudFormation stack to set up resources:

The template will create the Amazon EMR runtime role for both analyst user personas.
Log in to the AWS consumer account and accept the AWS RAM invitation first:

  1. Open the AWS RAM console with the IAM identity that has AWS RAM access.
  2. In the navigation pane, choose Resource shares under Shared with me.
  3. You should see two pending resource shares from the producer account.
  4. Accept both invitations.

You should be able to see the iceberg_db database on the Lake Formation console.

Create a resource link for the shared database

To access the database and table resources that were shared by the producer AWS account, you need to create a resource link in the consumer AWS account. A resource link is a Data Catalog object that is a link to a local or shared database or table. After you create a resource link to a database or table, you can use the resource link name wherever you would use the database or table name. In this step, you grant permission on the resource links to the job runtime roles for EMR Serverless. The runtime roles will then access the data in shared databases and underlying tables through the resource link.
To create a resource link, complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the iceberg_db database, verify that the owner account ID is the producer account, and on the Actions menu, choose Create resource links.
  4. For Resource link name, enter the name of the resource link (iceberg_db_shared).
  5. For Shared database’s region, choose the Region of the iceberg_db database.
  6. For Shared database, choose the iceberg_db database.
  7. For Shared database’s owner ID, enter the account ID of the producer account.
  8. Choose Create.

Grant permissions on the resource link to the EMR job runtime roles

Grant permissions on the resource link to Amazon-EMR-ExecutionRole_Finance and Amazon-EMR-ExecutionRole_Product using the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (iceberg_db_shared) and on the Actions menu, choose Grant.
  4. In the Principles section, select IAM users and roles, and choose Amazon-EMR-ExecutionRole_Finance and Amazon-EMR-ExecutionRole_Product.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and for Databases, choose iceberg_db_shared.
  6. In the Resource link permissions section, select Describe for Resource link permissions.

This allows the EMR Serverless job runtime roles to describe the resource link. We don’t make any selections for grantable permissions because runtime roles shouldn’t be able to grant permissions to other principles.
Choose Grant.

Grant table permissions for the finance analyst

Complete the following steps:

  1. Open the Lake Formation console with the Lake Formation data lake administrator in the consumer account.
  2. In the navigation pane, choose Databases.
  3. Select the resource link (iceberg_db_shared) and on the Actions menu, choose Grant on target.
  4. In the Principles section, select IAM users and roles, then choose Amazon-EMR-ExecutionRole_Finance.
  5. In the LF-Tags or catalog resources section, select Named Data Catalog resources and specify the following:
    1. For Databases, choose iceberg_db.
    2. For Tables¸ choose store_sales_iceberg.
  6. In the Table permissions section, for Table permissions, select Select.
  7. In the Data permissions section, select Column-based access.
  8. Select Exclude columns and choose all cost-related columns (ss_wholesale_cost and ss_ext_wholesale_cost).
  9. Choose Grant.
  10. Similarly, grant access to table customer_demographics_iceberg and exclude the column cd_credit_rating.
  11. Following the same steps, grant All data access for tables store_iceberg and item_iceberg.
  12. For the table date_dim_iceberg, we provide selective row-level access.
  13. Similar to the preceding table permissions, select date_dim_iceberg under Tables and in the Data filters section, choose Create new.
  14. For Data filter name, enter FA_Filter_year.
  15. Select Access to all columns under Column-level access.
  16. Select Filter rows and for Row filter expression, enter d_year=2002 to only provide access to the 2002 year.
  17. Choose Save changes.
  18. Choose Create filter.
  19. Make sure FA_Filter_year is selected under Data filters and grant select permissions on the filter.

Grant table permissions for the product analyst

You can provide permissions for the next set of tables required for the product analyst role using the Lake Formation console. Alternatively, you can use the AWS Command Line Interface (AWS CLI) to grant permissions. We provide grant on target permissions for the resource link iceberg_db_shared to IAM role Amazon-EMR-ExecutionRole_Product.

  1. Similar to steps followed in previous sections, for table store_sales_iceberg, date_dim_iceberg, store_iceberg, and house_hold_demographics_iceberg, provide select permissions for All data access. Make sure the role selected is Amazon-EMR-ExecutionRole_Product.

For table customer_iceberg, we limit access to personally identifiable information (PII) columns.

  1. Under Data permissions, select Column-based access and Exclude columns.
  2. Choose columns c_birth_day, c_birth_month, c_birth_year, c_current_addr_sk, c_customer_id, c_email_address, and c_birth_country.

Verify access using interactive notebooks from EMR Studio

Complete the following steps to test role access:

  1. Log in to the AWS consumer account and open the Amazon EMR console.
  2. Choose EMR Serverless in the navigation pane and choose an existing EMR Studio.
  3. If you don’t have EMR Studio configured, choose Get Started and select Create and launch EMR Studio.
  4. Create a Lake Formation enabled EMR Serverless application as described in previous sections.
  5. Create an EMR Studio Workspace as described in previous sections.
  6. Use emr-studio-service-role for Service role and datalake-resources-<account_id>-<region> for Workspace storage, then launch your Workspace.

Now, let’s verify access for the finance analyst.

  1. Make sure the compute type inside your Workspace is pointing to the EMR Serverless application created in the prior step and Amazon-EMR-ExecutionRole_Finance as the interactive runtime role.
  2. Go to File browser in the navigation pane, choose Upload files, and add Notebook_FA.ipynb to your Workspace.
  3. Run all the cells to verify fine-grained access.

Now let’s test access for the product analyst.

  1. In the same Workspace, detach and attach the same EMR Serverless application with Amazon-EMR-ExecutionRole_Product as the interactive runtime role.
  2. Upload Notebook_PA.ipynb under the File browser section.
  3. Run all the cells to verify fine-grained access for the product analyst.

In a real-world scenario, both analysts will likely have their own Workspace with restricted rights to assume only the authorized interactive runtime role.

Considerations and limitations

EMR Serverless with Lake Formation uses Spark resource profiles to create two profiles and two Spark drivers for access control. Read this white paper to learn about the feature details. The user profile runs the supplied code, and the system profile enforces Lake Formation policies. Therefore, it’s recommended that you have a minimum of two Spark drivers when pre-initialized capacity is used with Lake Formation enabled jobs. No change in executor count is required. Refer to Using EMR Serverless with AWS Lake Formation for fine-grained access control to learn more about the technical implementation of the Lake Formation integration with EMR Serverless.

You can expect a performance overhead after enabling Lake Formation. The level of access (table, column, or row) and the amount of data filtered will have significant impact on query performance.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary (consumer)  account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary (producer) account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed how to integrate Lake Formation with EMR Serverless to manage access to Iceberg tables. This solution showcases a modern way to enforce fine-grained access control in a multi-account open data lake setup. The approach simplifies data management in the main account while carefully controlling how users access data in other secondary accounts.

Try out the solution for your own use case, and let us know your feedback and questions in the comments section.


About the Authors

Anubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Monitor Apache HBase on Amazon EMR using Amazon Managed Service for Prometheus and Amazon Managed Grafana

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/monitor-apache-hbase-on-amazon-emr-using-amazon-managed-service-for-prometheus-and-amazon-managed-grafana/

Amazon EMR provides a managed Apache Hadoop framework that makes it straightforward, fast, and cost-effective to run Apache HBase. Apache HBase is a massively scalable, distributed big data store in the Apache Hadoop ecosystem. It is an open-source, non-relational, versioned database that runs on top of the Apache Hadoop Distributed File System (HDFS). It’s built for random, strictly consistent, real-time access for tables with billions of rows and millions of columns. Monitoring HBase clusters is critical in order to identify stability and performance bottlenecks and proactively preempt them. In this post, we discuss how you can use Amazon Managed Service for Prometheus and Amazon Managed Grafana to monitor, alert, and visualize HBase metrics.

HBase has built-in support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia or via JMX. You can either use AWS Distro for OpenTelemetry or Prometheus JMX exporters to collect metrics exposed by HBase. In this post, we show how to use Prometheus exporters. These exporters behave like small webservers that convert internal application metrics to Prometheus format and serve it at /metrics path. A Prometheus server running on an Amazon Elastic Compute Cloud (Amazon EC2) instance collects these metrics and remote writes to an Amazon Managed Service for Prometheus workspace. We then use Amazon Managed Grafana to create dashboards and view these metrics using an Amazon Managed Service for Prometheus workspace as its data source.

This solution can be extended to other big data platforms such as Apache Spark and Apache Presto that also use JMX to expose their metrics.

Solution overview

The following diagram illustrates our solution architecture.

Solution Architecture

This post uses an AWS CloudFormation template to perform below actions:

  1. Install an open-source Prometheus server on an EC2 instance.
  2. Create appropriate AWS Identity and Access Management (IAM) roles and security group for the EC2 instance running the Prometheus server.
  3. Create an EMR cluster with an HBase on Amazon S3 configuration.
  4. Install JMX exporters on all EMR nodes.
  5. Create additional security groups for the EMR master and worker nodes to connect with the Prometheus server running on the EC2 instance.
  6. Create a workspace in Amazon Managed Service for Prometheus.

Prerequisites

To implement this solution, make sure you have the following prerequisites:

aws emr create-default-roles

Deploy the CloudFormation template

Deploy the CloudFormation template in the us-east-1 Region:

Launch Stack

It will take 15–20 minutes for the template to complete. The template requires the following fields:

  • Stack Name – Enter a name for the stack
  • VPC – Choose an existing VPC
  • Subnet – Choose an existing subnet
  • EMRClusterName – Use EMRHBase
  • HBaseRootDir – Provide a new HBase root directory (for example, s3://hbase-root-dir/).
  • MasterInstanceType – Use m5x.large
  • CoreInstanceType – Use m5x.large
  • CoreInstanceCount – Enter 2
  • SSHIPRange – Use <your ip address>/32 (you can go to https://checkip.amazonaws.com/ to check your IP address)
  • EMRKeyName – Choose a key pair for the EMR cluster
  • EMRRleaseLabel – Use emr-6.9.0
  • InstanceType – Use the EC2 instance type for installing the Prometheus server

cloud formation parameters

Enable remote writes on the Prometheus server

The Prometheus server is running on an EC2 instance. You can find the instance hostname in the CloudFormation stack’s Outputs tab for key PrometheusServerPublicDNSName.

  1. SSH into the EC2 instance using the key pair:
    ssh -i <sshKey.pem> ec2-user@<Public IPv4 DNS of EC2 instance running Prometheus server>

  2. Copy the value for Endpoint – remote write URL from the Amazon Managed Service for Prometheus workspace console.

  1. Edit remote_write url in /etc/prometheus/conf/prometheus.yml:
sudo vi /etc/prometheus/conf/prometheus.yml

It should look like the following code:

  1. Now we need to restart the Prometheus server to pick up the changes:
sudo systemctl restart prometheus

Enable Amazon Managed Grafana to read from an Amazon Managed Service for Prometheus workspace

We need to add the Amazon Managed Prometheus workspace as a data source in Amazon Managed Grafana. You can skip directly to step 3 if you already have an existing Amazon Managed Grafana workspace and want to use it for HBase metrics.

  1. First, let’s create a workspace on Amazon Managed Grafana. You can follow the appendix to create a workspace using the Amazon Managed Grafana console or run the following API from your terminal (provide your role ARN):
aws grafana create-workspace \
--account-access-type CURRENT_ACCOUNT \
--authentication-providers AWS_SSO \
--permission-type CUSTOMER_MANAGED \
--workspace-data-sources PROMETHEUS \
--workspace-name emr-metrics \
--workspace-role-arn <role-ARN> \
--workspace-notification-destinations SNS
  1. On the Amazon Managed Grafana console, choose Configure users and select a user you want to allow to log in to Grafana dashboards.

Make sure your IAM Identity Center user type is admin. We need this to create dashboards. You can assign the viewer role to all the other users.

  1. Log in to the Amazon Managed Grafana workspace URL using your admin credentials.
  2. Choose AWS Data Sources in the navigation pane.

  1. For Service, choose Amazon Managed Service for Prometheus.

  1. For Regions, choose US East (N. Virginia).

Create an HBase dashboard

Grafana labs has an open-source dashboard that you can use. For example, you can follow the guidance from the following HBase dashboard. Start creating your dashboard and chose the import option. Provide the URL of the dashboard or enter 12722 and choose Load. Make sure your Prometheus workspace is selected on the next page. You should see HBase metrics showing up on the dashboard.

Key HBase metrics to monitor

HBase has a wide range of metrics for HMaster and RegionServer. The following are a few important metrics to keep in mind.

HMASTER Metric Name Metric Description
. hadoop_HBase_numregionservers Number of live region servers
. hadoop_HBase_numdeadregionservers Number of dead region servers
. hadoop_HBase_ritcount Number of regions in transition
. hadoop_HBase_ritcountoverthreshold Number of regions that have been in transition longer than a threshold time (default: 60 seconds)
. hadoop_HBase_ritduration_99th_percentile Maximum time taken by 99% of the regions to remain in transition state
REGIONSERVER Metric Name Metric Description
. hadoop_HBase_regioncount Number of regions hosted by the region server
. hadoop_HBase_storefilecount Number of store files currently managed by the region server
. hadoop_HBase_storefilesize Aggregate size of the store files
. hadoop_HBase_hlogfilecount Number of write-ahead logs not yet archived
. hadoop_HBase_hlogfilesize Size of all write-ahead log files
. hadoop_HBase_totalrequestcount Total number of requests received
. hadoop_HBase_readrequestcount Number of read requests received
. hadoop_HBase_writerequestcount Number of write requests received
. hadoop_HBase_numopenconnections Number of open connections at the RPC layer
. hadoop_HBase_numactivehandler Number of RPC handlers actively servicing requests
Memstore . .
. hadoop_HBase_memstoresize Total memstore memory size of the region server
. hadoop_HBase_flushqueuelength Current depth of the memstore flush queue (if increasing, we are falling behind with clearing memstores out to Amazon S3)
. hadoop_HBase_flushtime_99th_percentile 99th percentile latency for flush operation
. hadoop_HBase_updatesblockedtime Number of milliseconds updates have been blocked so the memstore can be flushed
Block Cache . .
. hadoop_HBase_blockcachesize Block cache size
. hadoop_HBase_blockcachefreesize Block cache free size
. hadoop_HBase_blockcachehitcount Number of block cache hits
. hadoop_HBase_blockcachemisscount Number of block cache misses
. hadoop_HBase_blockcacheexpresshitpercent Percentage of the time that requests with the cache turned on hit the cache
. hadoop_HBase_blockcachecounthitpercent Percentage of block cache hits
. hadoop_HBase_blockcacheevictioncount Number of block cache evictions in the region server
. hadoop_HBase_l2cachehitratio Local disk-based bucket cache hit ratio
. hadoop_HBase_l2cachemissratio Bucket cache miss ratio
Compaction . .
. hadoop_HBase_majorcompactiontime_99th_percentile Time in milliseconds taken for major compaction
. hadoop_HBase_compactiontime_99th_percentile Time in milliseconds taken for minor compaction
. hadoop_HBase_compactionqueuelength Current depth of the compaction request queue (if increasing, we are falling behind with storefile compaction)
. flush queue length Number of flush operations waiting to be processed in the region server (a higher number indicates flush operations are slow)
IPC Queues . .
. hadoop_HBase_queuesize Total data size of all RPC calls in the RPC queues in the region server
. hadoop_HBase_numcallsingeneralqueue Number of RPC calls in the general processing queue in the region server
. hadoop_HBase_processcalltime_99th_percentile 99th percentile latency for RPC calls to be processed in the region server
. hadoop_HBase_queuecalltime_99th_percentile 99th percentile latency for RPC calls to stay in the RPC queue in the region server
JVM and GC . .
. hadoop_HBase_memheapusedm Heap used
. hadoop_HBase_memheapmaxm Total heap
. hadoop_HBase_pausetimewithgc_99th_percentile Pause time in milliseconds
. hadoop_HBase_gccount Garbage collection count
. hadoop_HBase_gctimemillis Time spent in garbage collection, in milliseconds
Latencies . .
. HBase.regionserver.<op>_<measure> Operation latencies, where <op> is Append, Delete, Mutate, Get, Replay, or Increment, and <measure> is min, max, mean, median, 75th_percentile, 95th_percentile, or 99th_percentile
. HBase.regionserver.slow<op>Count Number of operations we thought were slow, where <op> is one of the preceding list
Bulk Load . .
. Bulkload_99th_percentile hadoop_HBase_bulkload_99th_percentile
I/O . .
. FsWriteTime_99th_percentile hadoop_HBase_fswritetime_99th_percentile
. FsReadTime_99th_percentile hadoop_HBase_fsreadtime_99th_percentile
Exceptions . .
. exceptions.RegionTooBusyException .
. exceptions.callQueueTooBig .
. exceptions.NotServingRegionException .

Considerations and limitations

Note the following when using this solution:

  • You can set up alerts on Amazon Managed Service for Prometheus and visualize them in Amazon Managed Grafana.
  • This architecture can be easily extended to include other open-source frameworks such as Apache Spark, Apache Presto, and Apache Hive.
  • Refer to the pricing details for Amazon Managed Service for Prometheus and Amazon Managed Grafana.
  • These scripts are for guidance purposes only and aren’t ready for production deployments. Make sure to perform thorough testing.

Clean up

To avoid ongoing charges, delete the CloudFormation stack and workspaces created in Amazon Managed Grafana and Amazon Managed Service for Prometheus.

Conclusion

In this post, you learned how to monitor EMR HBase clusters and set up dashboards to visualize key metrics. This solution can serve as a unified monitoring platform for multiple EMR clusters and other applications. For more information on EMR HBase, see Release Guide and HBase Migration whitepaper.


Appendix

Complete the following steps to create a workspace on Amazon Managed Grafana:

  1. Log in to the Amazon Managed Grafana console and choose Create workspace.

  1. For Authentication access, select AWS IAM Identity Center.

If you don’t have IAM Identity Center enabled, refer to Enable IAM Identity Center.

  1. Optionally, to view Prometheus alerts in your Grafana workspace, select Turn Grafana alerting on.

  1. On the next page, select Amazon Managed Service for Prometheus as the data source.

  1. After the workspace is created, assign users to access Amazon Managed Grafana.

  1. For a first-time setup, assign admin privileges to the user.

You can add other users with only viewer access.

Make sure you are able to log in to the Grafana workspace URL using your IAM Identity Center user credentials.


About the Author

Anubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Introducing Amazon S3 shuffle in AWS Glue

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/introducing-amazon-s3-shuffle-in-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, which is an open-source, distributed processing system for your data integration tasks and big data workloads. Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple partitions so that you can execute different transformations in parallel.

Shuffling is an important step in a Spark job whenever data is rearranged between partitions. The groupByKey(), reduceByKey(), join(), and distinct() are some examples of wide transformations that can cause a shuffle. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there is not enough disk space left on the executor and there is no recovery.

This post introduces a new Spark shuffle manager available in AWS Glue that disaggregates Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle and spill files. Using Amazon S3 for Spark shuffle storage lets you run data-intensive workloads much more reliably.

Understanding the shuffle operation in AWS Glue

Spark creates physical plans for running your workflow, called Directed Acyclic Graphs (DAGs). The DAG represents a series of transformations on your dataset, each resulting in a new immutable RDD. All of the transformations in Spark are lazy, in that they are not computed until an action is called to generate results. There are two types of transformations:

  • Narrow transformation – Such as map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – Such as join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions.

In Spark, a shuffle occurs whenever data is rearranged between partitions. This is required because the wide transformation needs information from other partitions in order to complete its processing. Spark gathers the required data from each partition and combines it into a new partition. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. With AWS Glue, workers write shuffle data on local disk volumes attached to the AWS Glue workers.

In addition to shuffle writes, Spark uses local disk to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. Shuffle spill (memory) is the size of the de-serialized form of the data in the memory at the time when the worker spills it. Whereas shuffle spill (disk) is the size of the serialized form of the data on disk after the worker has spilled.

Challenges

Spark uses local disk for storing intermediate shuffle and shuffle spills. This introduces the following key challenges:

  • Hitting local storage limits – If you have a Spark job that computes transformations over a large amount of data, and results in either too much spill or shuffle or both, then you might get a failed job with  java.io.IOException: No space left on device exception if the underlying storage has filled up.
  • Co-location of storage with executors – If an executor is lost, then shuffle files are lost as well. This leads to several task and stage retries, as Spark tries to recompute stages in order to recover lost shuffle data. Spark natively provides an external shuffle service that lets it store shuffle data independent to the life of executors. But the shuffle service itself becomes a point of failure and must always be up in order to serve shuffle data. Additionally, shuffles are still stored on local disk, which might run out of space for a large job.

To illustrate one of the preceding scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join, group by, and union all. Let’s run the following query with 10 G1.x AWS Glue DPU (data processing unit). For the G.1X worker type, each worker maps to 1 DPU and 1 executor. 10 G1.x workers account for a total of 640GB of disk space. See the following sql query:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the AWS Glue job run details from the Apache Spark web UI:

The job runs for about 1 hour and 25 minutes, then we start observing task failures. Spark ends up stopping the stage and canceling the job when the task retries also fail.

The following screenshots show the aggregated metrics for the failed stage, as well as how much data is spilled to disk by individual executors:

As seen in the Shuffle Write metric from the above Spark UI screenshot, all 10 workers shuffle over 50 GB of data. Further writes aren’t allowed, and tasks start failing with a “No space left on device” error.

The remaining storage is occupied by data that is spilled to disk, as seen in the Shuffle Spill (Disk) metric from the above Spark UI screenshot. This failed job is a classic example of a data-intensive transformation where Spark is both shuffling and spilling to disk when executor memory is filled.

Solution overview

We have various methods for overcoming the disk space error:

  • Scale out – Increase the number of workers. This incurs an increase in cost. However, scaling out might not always work, especially if your data is heavily skewed on a few keys. Fixing skewness will require considerable modifications to your Spark application logic.
  • Increase shuffle partitions – Increasing the shuffle partitions can sometimes help overcome space errors. However, this might not always work, and therefore is unreliable.
  • Disaggregate compute and storage – This approach presents several of the advantages of not only scaling storage for large shuffles, but also adding reliability in the event of node failures because shuffle data is independently stored. Following are few implementations of this disaggregated approach:
    • Dedicated intermediate storage cluster – In this approach, you use an additional fleet of shuffle services to serve intermediate shuffle. It has several advantages, such as merging shuffle files and sequential I/O, but it introduces an overhead of fleet maintenance from both operations, as well as a cost standpoint. For examples of this approach, see Cosco: An Efficient Facebook-Scale Shuffle Service and Zeus: Uber’s Highly Scalable and Distributed Shuffle as a Service.
    • Serverless storage – AWS Glue implements a different approach in which you utilize Amazon S3, a cost-effective managed and serverless storage, to store intermediate shuffle data. This design does not depend upon a dedicated daemon, such as shuffle service, to preserve shuffle files. This lets you elastically scale your Spark job without the overhead of running, operating, and maintaining additional storage or compute nodes.

With AWS Glue 2.0, you can now use Amazon S3 to store Spark shuffle and spill data. Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably.

The following diagram illustrates how Spark map tasks write the shuffle and spill files to the given Amazon S3 shuffle bucket. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle bucket.

Use Amazon S3 to store shuffle and spill data

The following job parameters enable and tune Spark to use S3 buckets for storing shuffle and spill data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key  Value  Explanation
–write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
–write-shuffle-spills-to-s3 TRUE This is an optional flag that lets you offload spill files to S3 buckets, which provides additional resiliency to your Spark job. This is only required for large workloads that spill a lot of data to disk. This flag is disabled by default.
–conf spark.shuffle.glue.s3ShuffleBucket=S3://<shuffle-bucket> This is also optional, and it specifies the S3 bucket where we write the shuffle files. By default, we use —TempDir/shuffle-data.

You can also use the AWS Glue Studio console to enable Amazon S3 based shuffle or spill. You can choose the preceding properties from pre-populated options in the Job parameters section.

Results

Let’s run the same q80.sql query with Amazon S3 shuffle enabled. We can view the shuffle files stored in the S3 bucket in the following format:

shuffle_<jobid>_<mapperid>_<reducerid>.data/index

Two kinds of files are created:

  • Data – Stores the shuffle output of the current task
  • Index – Stores the classification information of the data in the data file by storing partition offsets

The following screenshots shows example shuffle directories and shuffle files:

The following screenshot shows the aggregated metrics from the Spark UI:

The following are a few key highlights:

  • q80.sql, which had failed earlier after 1 hour and 25 minutes, and was able to complete only 13 out of 18 stages, finished successfully in about 2 hours and 53 minutes, completing all 18 stages.
  • We were able to shuffle 479.7 GB of data without worrying about storage limits.
  • Additional workers aren’t required to scale storage, which provides substantial cost savings.

Considerations and best practices

Keep in mind the following best practices when considering this solution:

  • This feature is recommended when you want to ensure the reliable execution of your data intensive workloads that create a large amount of shuffle or spill data. Writing and reading shuffle files from Amazon S3 is marginally slower when compared to local disk for our experiments with TPC-DS queries. S3 shuffle performance would be impacted by the number and size of shuffle files. For example, S3 could be slower for reads as compared to local storage if you have a large number of small shuffle files or partitions in your Spark application.
  • You can use this feature if your job frequently suffers from No space left on device issues.
  • You can use this feature if your job frequently suffers fetch failure issues (org.apache.spark.shuffle.MetadataFetchFailedException).
  • You can use this feature if your data is skewed.
  • We recommend setting the S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.glue.s3ShuffleBucket) in order to clean up old shuffle data.
  • At the time of writing this blog, this feature is currently available on AWS Glue 2.0 and Spark 2.4.3.

Conclusion

This post discussed how we can independently scale storage in AWS Glue without adding additional workers. With this feature, you can expect jobs that are processing terabytes of data to run much more reliably. Happy shuffling!


About the Authors

Anubhav Awasthi is a Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything to do with the data.

Mohit Saxena is a Software Engineering Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.