Tag Archives: Apache Spark

Monitor Spark streaming applications on Amazon EMR

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/monitor-spark-streaming-applications-on-amazon-emr/

For applications to be enterprise-ready, you need to consider many aspects of the application before moving to a production environment and have operational visibility of your application. You can get that visibility through metrics that measure your application’s health and performance and feed application dashboards and alarms.

In streaming applications, you need to benchmark different stages and tasks in each stage. Spark has provided some interfaces to plug in your probes for real-time monitoring and observation of your applications. SparkListeners is a flexible and powerful tool for both steaming and batch applications. You can combine it with Amazon CloudWatch metrics, dashboards, and alarms for visibility and generate notifications when issues arise or automatically scale clusters and services.

This post demonstrates how to implement a simple SparkListener, monitor and observe Spark streaming applications, and set up some alerts. The post also shows how to use alerts to set up automatic scaling on Amazon EMR clusters, based on your CloudWatch custom metrics.

Monitoring Spark streaming applications

For production use cases, you need to plan ahead to determine the amount of resources your Spark application requires. Real-time applications often have SLAs that they need to meet, such as how long each batch execution can safely run or how long each micro-batch can be delayed. Quite often, in the lifecycle of an application, sudden increases of data in the input stream require more application resources to process and catch up with the influx.

For these use cases, you may be interested in common metrics such as the count of records in each micro-batch, the delay on running scheduled micro-batches, and how long each batch takes to run. For example, in Amazon Kinesis Data Streams, you can monitor the IteratorAge metric. With Apache Kafka as a streaming source, you might monitor consumer lag, such as the delta between the latest offset and the consumer offset. For Kafka, there are various open-source tools for this purpose.

You can react in real time or raise alerts based on environment changes by provisioning more resources or reducing unused resources for cost optimization.

Different methods to monitor Spark streaming applications are already available. A very efficient, out-of-the-box feature of Spark is the Spark metrics system. Additionally, Spark can report metrics to various sinks including HTTP, JMX, and CSV files.

You can also monitor and record application metrics from within the application by emitting logs. This requires running count().print(), printing metrics in maps and reading the data that may cause delays, adding to the application stages, or performing unwanted shuffles that may be useful for testing but often prove to be expensive as a long-term solution.

This post discusses another method: using the SparkStreaming interface. The following screenshot shows some available metrics on the Spark UI’s Streaming tab.

Apache Spark listeners

Spark internally relies on SparkListeners for communication between its internal components in an event-based fashion. Also, Spark scheduler emits events for SparkListeners whenever the stage of each task changes. SparkListeners listen to the events that are coming from Spark’s DAGScheduler, which is the heart of the Spark execution engine. You can use custom Spark listeners to intercept SparkScheduler events so you know when a task or stage starts and finishes.

The Spark Developer API provides eight methods in the SparkListener trait called on different SparkEvents, mainly at start and stop, failure, completion, or submission of receivers, batches, and output operation. You can execute an application logic at each event by implementing these methods. For more information, see StreamingListener.scala on GitHub.

To register your custom Spark listener, set spark.extraListeners when launching the application, or programmatically by calling addSparkListener when setting up SparkContext in your application.

SparkStreaming micro-batches

By default, SparkStreaming has a micro-batch execution model. Spark starts a job in intervals on a continuous stream. Each micro-batch contains stages, and stages have tasks. Stages are based on the DAG and the operation that the application code defines, and the number of tasks in each stage is based on the number of DStream partitions.

At the start of a streaming application, the receivers are assigned to executors, in a round-robin fashion, as long-running tasks.

Receivers create blocks of data based on blockInterval. The received blocks are distributed by the BlockManager of the executors, and the network input tracker running on the driver is informed about the block locations for further processing.

On the driver, an RDD is created for the blocks in each batchInterval. Each block translates to a partition of the RDD and a task is scheduled to process each partition.

The following diagram illustrates this architecture.

Creating a custom SparkListener and sending metrics to CloudWatch

You can rely on CloudWatch custom metrics to react or raise alarms based on the custom Spark metrics you collect from a custom Spark listener.

You can implement your custom streaming listeners by directly implementing the SparkListener trait if writing in Scala, or its equivalent Java interface or PySpark Python wrapper pyspark.streaming.listener.

For this post, you only override onBatchCompleted and onReceiverError because you’re only collecting metrics about micro-batches.

From OnBatchCompleted, you submit the following metrics:

  • Heartbeat – A numeric 1 (one) whenever a batch completes so you can sum or average time periods to see how many micro-batches ran
  • Records – The number of records per batch
  • Scheduling delay – The delay from when the batch was scheduled to run until when it actually ran
  • Processing delay – How long the batch execution took
  • Total delay – The sum of the processing delay and scheduling delay

From OnRecieverError, you submit a numeric 1 (one), whenever a receiver fails. See the following code:

/**
    * This method executes when a Spark Streaming batch completes.
    *
    * @param batchCompleted Class having information on the completed batch
    */

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    log.info("CloudWatch Streaming Listener, onBatchCompleted:" + appName)

    // write performance metrics to CloutWatch Metrics
    writeBatchStatsToCloudWatch(batchCompleted)

  }
  /**
  * This method executes when a Spark Streaming batch completes.
  *
  * @param receiverError Class having information on the reciever Errors
  */

  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { 
    log.warn("CloudWatch Streaming Listener, onReceiverError:" + appName)

    writeRecieverStatsToCloudWatch(receiverError)
  }

For the full source code of this example for Scala implementation and a sample Spark Kinesis streaming application, see the AWSLabs GitHub repository.

To register your custom listener, make an instance of the custom listener object and pass the object to the streaming context, in the driver code, using the addStreamingListener method. See the following code:

val conf = new SparkConf().setAppName(appName)
val batchInterval = Milliseconds(1000)
val ssc = new StreamingContext(conf, batchInterval)
val cwListener = new CloudWatchSparkListener(appName)

ssc.addStreamingListener(cwListener)

When you run the application, you can find your metrics in CloudWatch in the same account as the one the EMR cluster is running in. See the following screenshot.

Using the sample code

This post provides an AWS CloudFormation template, which demonstrates the code. Download the emrtemplate.json file from the GitHub repo. The template launches an EMR cluster in a public subnet and a Kinesis data stream with three shards with the required default AWS Identity and Access Management (IAM) roles. The sample Spark Kinesis streaming application is a simple word count that an Amazon EMR step script compiles and packages with the sample custom StreamListener.

Using application alarms in CloudWatch

The alerts you need to set up mainly depend on the SLA of your application. As a general rule, you don’t want your batches to take longer than the micro-batch intervals because it causes the scheduled batches to queue and you start falling behind the input stream. Also, if the rate of your receivers reading from the stream is more than what you can process in the batches due to a surge, the read records can spill to disk and cause more delays to shuffle across to other executors. You can set up a CloudWatch alarm to notify you when a processing delay is approaching your application’s batchInterval. For instructions on setting up an alarm, see Using Amazon CloudWatch Alarms.

The CloudFormation template for this post has two sample alarms to monitor. One is based on the anomaly detection band on the processingDelays metric; the second is based on a threshold on a math expression that calculates schedulingDelay ratio to totalDelay or (schedulingDelay / totalDelay) * 100 .

Scaling streaming applications

In terms of scaling, as the amount of data grows, you have more DStream partitions, based on the blockIntervals of the streaming application. In addition to the batches that should catch up with the received records and finish within batch intervals, the receivers should also keep up with the influx of records. The source streams should provide enough bandwidth for the receivers to read fast enough from the stream, and there should be enough receivers reading at the right rate to consume the records from the source.

If your DStreams are backed by receivers and WALs, you need to consider the number of receivers in advance. When you launch the application, the number of receivers may not change without restarting the application.

When a SparkStreaming application starts, by default, the driver schedules the receivers in a round-robin fashion on the available executors unless a preferred location is defined for receivers. When all executors are allocated with receivers, the rest of the required receivers are scheduled on the executors to balance the number of receivers on each executor, and the receivers stay up in the executors as long-running tasks. For more information about scheduling receivers on executors, see ReceiverSchedulingPolicy.scala on GitHub and SPARK-8882 on the Spark issues website.

Sometimes you may want to slow down receivers because you want less data in micro-batches and don’t want to surpass your micro-batch intervals. To slow down receivers, in case you have streaming sources that can hold on to the records when the batches can’t run fast enough to keep up with the surge of records, you can enable the BackPressure feature to adapt to the input rate from receivers. To do so, set spark.streaming.backpressure.enabled to true.

Another factor you can consider is the dynamic allocation for streaming applications. By default, spark.dynamicAllocation is enabled on Amazon EMR, which is mutually exclusive to spark.streaming.dynamicAllocation. If you want the driver to request for more executors for your DStream tasks, you need to set spark.dynamicAllocation.enabled to false and spark.streaming.dynamicAllocation.enabled to true. Spark periodically looks into the average batch duration. If it’s above the scale-up ratio, it requests for more executors. If it’s below the scale-down ratio, it releases the idle executors, preferably those that aren’t running any receivers. For more information, see ExecutorAllocationManager.scala on GitHub and the Spark Streaming Programming Guide.

The ExecutorAllocationManager is already looking into the batch execution average time and requests more executors based on the scale-up and scale-down ratios. Because of this, you can set up automatic scaling in Amazon EMR, preferably on tasks instance groups, to add and remove nodes based on the ContainerPendingRatio and assign PreferredLocation for receivers to core nodes. The example code for this post provides a custom KinesisInputDStream, which allows assigning the preferred location for every receiver you request. It’s basically a function that returns a hostname to preferably place the receiver. The GitHub repo also has a sample application that uses the customKinesisInputDStream and customKinesisReciever, which allows requesting a preferredLocation for receivers.

At scale-down, Amazon EMR nominates the nodes with the fewest containers running for decommissioning in the task instance group.

For more information about setting up automatic scaling, see Using Automatic Scaling with a Custom Policy for Instance Groups. The example code contains a threshold on schedulingDelay. As a general rule, you should base the threshold on the batchIntervals and processingDelay. A growth in schedulingDelay usually means a lack of resources to schedule a task.

The following table summarizes the configuration attributes to tune when you launch your Spark streaming job.

Configuration AttributeDefault
spark.streaming.backpressure.enabledFalse
spark.streaming.backpressure.pid.proportional1.0
spark.streaming.backpressure.pid.integral0.2
spark.streaming.backpressure.pid.derived0.0
spark.streaming.backpressure.pid.minRate100
spark.dynamicAllocation.enabledTrue
spark.streaming.dynamicAllocation.enabledFalse
spark.streaming.dynamicAllocation.scalingInterval60 Seconds
spark.streaming.dynamicAllocation.minExecutorsmax(1,numReceivers)
spark.streaming.dynamicAllocation.maxExecutorsInteger.MAX_VALUE

Monitoring structured streaming with a listener

Structured streaming still processed records in micro-batches and triggers queries when there is data from receivers. You can monitor these queries using another listener interface, StreamingQueryListener. This post provides a sample listener for structured streaming on Kafka, with a sample application to run. For more information, see CloudWatchQueryListener.scala GitHub. The following image is a snapshot of few CloudWatch custom metrics the custom StreamingQueryListerer will collect.

Scaling down your EMR cluster

When you launch a Spark streaming application, Spark evenly schedules the receivers on all available executors at the start of the application. When an EMR cluster is set to scale down, Amazon EMR nominates the nodes running fewer tasks in the instance group with an automatic scaling rule. Although Spark receivers are long-running tasks, Amazon EMR waits for yarn.resourcemanager.decommissioning.timeout, or when the NodeManagers are decommissioned, to gracefully terminate and shrink the nodes. You’re always at risk of losing a running executor with a receiver. You should always consider enough Spark block replication and CheckPointing for the DStreams and ideally define a PreferedLocation so you don’t risk losing receivers.

Metrics pricing

In general, Amazon EMR metrics don’t incur CloudWatch costs. However, custom metrics incur charges based on CloudWatch metrics pricing. For more information, see Amazon CloudWatch pricing. Additionally, Spark Kinesis Streaming relies on the Kinesis Client Library, and it publishes custom CloudWatch metrics that also incur charges based on CloudWatch metrics pricing. For more information, see Monitoring the Kinesis Client Library with Amazon CloudWatch.

Conclusion

Monitoring and tuning Spark streaming and real-time applications is challenging, and you must react to environment changes in real time. You also need to monitor your source streams and job outputs to get a full picture. Spark is a very flexible and rich framework that provides multiple options for monitoring jobs. This post looked into an efficient way to monitor the performance of Spark streaming micro-batches using SparkListeners and integrate the extracted metrics with CloudWatch metrics.

 


About the Author

Amir Shenavandeh is a Hadoop systems engineer with AWS. He helps customers with architectural guidance and technical support using open-source applications, develops and advances the applications of the Hadoop ecosystem and works with the open source community. 

 

 

How Drop used the Amazon EMR runtime for Apache Spark to halve costs and get results 5.4 times faster

Post Syndicated from Michael Chau original https://aws.amazon.com/blogs/big-data/how-drop-used-the-amazon-emr-runtime-for-apache-spark-to-halve-costs-and-get-results-5-4-times-faster/

This is a guest post by Michael Chau, software engineer with Drop, and Leonardo Gomez, AWS big data specialist solutions architect. In their own words, “Drop is on a mission to level up consumer lives, one reward at a time. Through our personalized commerce platform, we intelligently surface the right brands, at the right time, to make our members’ everyday better than it was before. Powered by machine learning, we match consumers with over 200+ partner brands to satisfy two main goals: to earn points from their purchases and redeem them for instant rewards. Calling Toronto home but operating under a global mindset, Drop is building the next-level experience for our 3 million+ members across North America. Learn more by visiting www.joindrop.com.”

Overview

At Drop, our data lake infrastructure plays a foundational role in enabling better data-informed product and business decisions. A critical feature is its ability to process vast amounts of raw data and produce reconciled datasets that follow our data lake’s standardized file format and partitioning structure. Our business intelligence, experimentation analytics, and machine learning (ML) systems use these transformed datasets directly.

This post details how we designed and implemented our data lake’s batch ETL pipeline to use Amazon EMR, and the numerous ways we iterated on its architecture to reduce Apache Spark runtimes from hours to minutes and save over 50% on operational costs.

Building the pipeline

Drop’s data lake serves as the center and source of truth for the company’s entire data infrastructure upon which our downstream business intelligence, experimentation analytics, and ML systems critically rely. Our data lake’s goal is to ingest vast amounts of raw data from various sources and generate reliable and reconciled datasets that our downstream systems can access via Amazon Simple Storage Service (Amazon S3). To accomplish this, we architected our data lake’s batch ETL pipeline to follow the Lambda architecture processing model and used a combination of Apache Spark and Amazon EMR to transform the raw ingested data that lands into our Amazon S3 lake into reconciled columnar datasets. When designing and implementing this pipeline, we adopted the following core guiding principles and considerations:

  • Keep our tech stack simple
  • Use infrastructure as code
  • Work with transient resources

Keeping our tech stack simple

We aimed to keep our tech stack simple by using existing and proven AWS technologies and only adopting services that would drive substantial impact. Drop is primarily an AWS shop, so continuing to use AWS technologies made sense due to our existing experience, the ability to prototype new features quickly, and the inherent integration benefits of using other services within Amazon’s ecosystem.

Another effort to keep our tech stack simple was to limit the overhead and complexity of newly adopted open-source Apache Hadoop technologies. Our engineering team initially had limited experience working with these technologies, so we made a conscious effort to mitigate additional technical overhead to our stack by using proven fully-managed services. We integrated Amazon EMR as part of our idempotent data pipelines because we could use the service when our pipeline operations required it, which eliminated the need to maintain the service when no longer required. This allowed us to reduce the technical overhead of constantly maintaining production clusters.

Using infrastructure as code

We use Apache Airflow to manage and schedule our data lake pipeline operations. Using Airflow enables us to build our entire workflows and infrastructure as code via Airflow Directed Acyclic Graphs (DAGs). This key decision also simplified our engineering development and deployment processes, while providing version control for all aspects of our data infrastructure.

Working with transient resources

To reduce operational costs, we made a key decision to build our data processing pipelines using transient resources. By designing our pipelines to spin up EMR clusters only upon operational demand and terminate upon job completion, we can use Amazon Elastic Compute Cloud (Amazon EC2) Spot and On-Demand Instances without paying for idle resources. This approach has enabled a dramatic reduction in costs associated with idle clusters.

Batch ETL pipeline overview

The following diagram illustrates our batch ETL pipeline architecture.

The pipeline includes the following steps:

  1. A core requirement for the Lambda architecture data model is to have access to both batch and stream data sources of a dataset. The batch ETL pipeline primarily ingests data in batch and stream formats from our Amazon Relational Database Service (Amazon RDS) Postgres database using AWS Database Migration Service (AWS DMS). The pipeline initiates full-migration AWS DMS tasks for comprehensive batch snapshots using Airflow, and ingests stream data using ongoing replication AWS DMS tasks for 1-minute latency change data capture (CDC) files. Data from both batch and stream formats are landed into our Amazon S3 lake, and cataloged in our AWS Glue Data Catalog using AWS Glue crawlers.
  2. The batch ETL pipeline Apache Airflow DAG runs a series of tasks, which begins with uploading our Lambda architecture Spark application to Amazon S3, spinning up an EMR cluster, and ultimately running the Spark application as an Amazon EMR step. Depending on the characteristics of the datasets, the necessary Amazon EMR resources are calibrated to produce the reconciled dataset. To produce the resultant datasets in Apache Parquet format, we must allocate sufficient CPU and memory to our clusters.
  3. Upon completion of all of the Amazon EMR steps, the cluster is terminated, and the newly produced dataset is crawled using AWS Glue crawlers to update the dataset’s metadata within the Data Catalog. The output dataset is now ready for consumer systems to access via Amazon S3 or query using Amazon Athena or Amazon Redshift Spectrum.

Evolving the EMR pipeline

Our engineering team is constantly iterating on the architecture of our batch ETL pipeline in an effort to reduce its runtime duration and operational costs. The following iterations and notable feature enhancements have generated the largest impact to the downstream systems, as well as the end-users that rely on this pipeline.

Migrating from AWS Glue to Amazon EMR

The first iteration of our batch ETL pipeline used AWS Glue to process our Spark applications rather than Amazon EMR due to our limited in-house Hadoop experience in the initial stages. AWS Glue was an appealing first solution due to itsETL as a service” features, and simplified resource allocation. The AWS Glue solution successfully delivered desired results; however, as we gained experience with Hadoop technologies, we recognized a significant opportunity to use Amazon EMR to improve pipeline performance and reduce operational costs.

The migration from AWS Glue to Amazon EMR was seamless and only required EMR cluster configurations and minor modifications to our Spark application that used AWS Glue libraries. Thanks to this, we achieved the following operational benefits:

  • Faster cluster bootstrapping and resource provisioning durations. We found that AWS Glue clusters have a cold start time of 10–12 minutes, whereas EMR clusters have a cold start time of 7–8 minutes.
  • An 80% reduction in cost while using equivalent resources. We swapped the standard AWS Glue worker type at the cost of $0.44 per DPU-Hour, for the resource equivalent m5.xlarge Amazon EMR instance type, which has a Spot Instance price of approximately $0.085 per instnace per hour.

File committers

Our original partitioning strategy attempted to use Spark’s dynamic write partitioning feature to reduce the number of written files per run. See the following code:

sparkSession.conf.set(“spark.sql.sources.partitionOverwriteMode”, “dynamic”)

This strategy didn’t translate well in our pipeline’s performance; we quickly experienced the limitations and considerations of working with cloud object stores. By pivoting our Spark application’s file-writing strategy to completely overwrite an existing directory and using the Amazon EMR EMRFS S3-optimized committer, we could realize critical performance gains. In scenarios where datasets were nearly a terabyte, deployment of this optimized file committer reduced runtime from hours to less than half an hour! It’s worth noting that Amazon EMR 5.30.0 includes an optimization that should help with dynamic partitionOverwriteMode.

Upgrading Amazon EMR versions to 5.28+

Our datasets often exceed billions of rows, which necessitated the comparison and processing of hundreds of thousands of stream files against large batch files. The ability to execute these Spark operations given the input data sources comes at a high cost to query and process the data.

A huge improvement in our pipeline’s overall performance came from using the Amazon EMR runtime for Apache Spark feature introduced in Amazon EMR version 5.28. We saw immediate performance gains by upgrading from Amazon EMR 5.27 to 5.29, without having to make any additional changes to our existing pipeline. Our Spark application total runtime and subsequent Amazon EMR cost was reduced by over 35% using identical resource configurations. These improvements were benchmarked against two of our datasets and averaged against three production runs.

The following table summarizes the dataset and EMR cluster properties.

DatasetTable RowsTotal Batch Files SizeTotal Stream Files SizeCount Stream FilesEC2 Instance TypeCount EC2 Instances
Dataset A~3.5M~0.5GB~0.2GB~100km5.xlarge10
Dataset B~3,500M~500GB~120GB~250kr5.2xlarge30

The following diagrams summarize the Amazon EMR upgrade performance benchmarks and metrics. We calculated these cost metrics with Amazon EMR bootstrapping and resource provisioning time included.

Amazon EMR step concurrency

Early iterations of our pipeline architecture involved creating a new batch ETL pipeline per dataset, as well as a dedicated EMR cluster for that dataset. Cloning new pipelines was a quick and simple way to scale our processing capabilities because our infrastructure was written as code and the operations and resources were self-contained. Although this enabled pipeline generation quickly for our most important datasets, there was ample opportunity for operational improvements.

The following screenshot shows Drop’s batch ETL processing DAG. All of the clusters are named after the Drop engineering team’s pets.

The evolution of the pipeline architecture involved grouping datasets based on its Amazon EMR resource requirements and running them as Spark application Amazon EMR steps in a common EMR cluster concurrently using Amazon EMR step concurrency. Re-architecting our batch ETL pipelines in this manner allowed us to do the following:

  • Remove the EMR cluster bootstrapping and provisioning duration associated within individual EMR clusters per dataset
  • Reduce overall Spark runtimes in aggregate
  • Simplify our Amazon EMR resource configurations with fewer EMR clusters

On average, our clusters required 8–10 minutes to bootstrap and source the Spot Instances requested. By migrating multiple Spark applications to a common EMR cluster, we removed this bottleneck, and ultimately reduced overall runtime and Amazon EMR costs. Amazon EMR step concurrency also allowed us to run multiple applications at the same time against a dramatically reduced set of resources. For our smaller datasets (under 15 million rows), we learned that running Spark applications concurrently with reduced resources didn’t have a linear effect on overall runtime, and we could achieve shorter runtimes with fewer resources compared to the previous architecture in aggregate. However, our larger datasets (over 1 billion rows) didn’t exhibit the same performance behaviors or gains as the smaller tables when running Amazon EMR steps concurrently. Therefore, EMR clusters for larger tables required additional resources and fewer steps; however, the overall result is still marginally better in terms of cost and overall runtime in aggregate compared to the previous architecture.

Amazon EMR instance fleets

Working with Amazon EMR and Amazon EC2 Spot Instances has allowed us to realize tremendous cost savings, but it can come at the expense of EMR cluster reliability. We have experienced Spot Instance availability issues due to Spot Instance type supply constraints on the available market and losing EC2 instances due to competitive bidding. Both issues directly contribute to overall pipeline performance degradation in the form of longer EMR cluster resource provisioning and longer Spark runtimes due to lost nodes.

To improve our pipeline reliability and protect against these risks, we began to use Amazon EMR instance fleets. Instance fleets addressed both pain points—they limited supply of a specific EC2 Spot Instance type by sourcing an alternative Amazon EMR instance type, and the ability to automatically switch to On-Demand Instances if provisioning Spot Instances exceeds a specified threshold duration. Prior to using instance fleets, about 15% of our Amazon EMR production runs were affected by limitations related to Spot Instance supply or price bidding. Since implementing instance fleets, we haven’t had a cluster fail or experienced prolonged resource provisioning past programmed thresholds.

Conclusion

Amazon EMR has played a critical role in Drop’s ability to use data to make better-informed product and business decisions. We have had tremendous success in capitalizing Amazon EMR features to improve our data processing pipeline’s overall performance and cost efficiency, and will continue to explore new ways to constantly improve our pipeline. One of the easiest ways to learn about these new opportunities to improve our systems is to stay current with the latest AWS technologies and Amazon EMR features.

 


About the Authors

Michael Chau is a Software Engineer at Drop. He has experience moving data from A to B and sometimes transforming it along the way.

 

 

 

 

Leonardo Gómez is a Big Data Specialist Solutions Architect at AWS. Based in Toronto, Canada, He works with customers across Canada to design and build big data architectures.

 

 

Simplify data pipelines with AWS Glue automatic code generation and Workflows

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/simplify-data-pipelines-with-aws-glue-automatic-code-generation-and-workflows/

In the previous post of the series, we discussed how AWS Glue job bookmarks help you to incrementally load data from Amazon S3 and relational databases. We also saw how using the AWS Glue optimized Apache Parquet writer can help improve performance and manage schema evolution.

In the third post of the series, we’ll discuss three topics. First, we’ll look at how AWS Glue can automatically generate code to help transform data in common use cases such as selecting specific columns, flattening deeply nested records, efficiently parsing nested fields, and handling column data type evolution.

Second, we’ll outline how to use AWS Glue Workflows to build and orchestrate data pipelines using different Glue components such as Crawlers, Apache Spark and Python Shell ETL jobs.

Third, we’ll see how to leverage SparkSQL in your ETL jobs to perform SQL based transformations on datasets stored in Amazon S3 and relational databases.

Automatic Code Generation & Transformations: ApplyMapping, Relationalize, Unbox, ResolveChoice

AWS Glue can automatically generate code to help perform a variety of useful data transformation tasks. These transformations provide a simple to use interface for working with complex and deeply nested datasets. For example, some relational databases or data warehouses do not natively support nested data structures. AWS Glue can automatically generate the code necessary to flatten those nested data structures before loading them into the target database saving time and enabling non-technical users to work with data.

The following is a list of the popular transformations AWS Glue provides to simplify data processing:

  1. ApplyMapping is a transformation used to perform column projection and convert between data types. In this example, we use it to unnest several fields, such as action.id, which we map to the top-level action.id field. We also cast the id column to a long.
    medicare_output = medicare_src.apply_mapping(
        [('id, 'string', id, 'string'), 
        ('type, string, type', string),
        ('actor.id, 'int', actor.id', int),
        ('actor.login', 'string', actor.login', 'string'),
        ('actor.display_login', 'string', 'actor.display_login', 'string'),
        ('actor.gravatar_id', 'long', 'actor.gravatar_id', 'long'),
        ('actor.url', 'string','actor.url', 'string'),
        ('actor.avatar_url', 'string', 'actor.avatar_url', string)]
    )

  1. Relationalize converts a nested dataset stored in a DynamicFrameto a relational (rows and columns) format. Nested structures are unnested into top level columns and arrays decomposed into different tables with appropriate primary and foreign keys inserted. The result is a collection of DynamicFrames representing a set of tables that can be directly inserted into a relational database. More detail about relationalize can be found here.
    ## An example relationalizing and writing to Redshift
    dfc = history.relationalize("hist_root", redshift_temp_dir)
    ## Cycle through results and write to Redshift.
    for df_name in dfc.keys():
        df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name, " ..."
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = df, 
            catalog_connection = "redshift3", 
            connection_options = {"dbtable": df_name, "database": "testdb"}, 
            redshift_tmp_dir = redshift_temp_dir)

  2. Unbox parses a string field of a certain type, such as JSON, into individual fields with their corresponding data types and store the result in a DynamicFrame. For example, you may have a CSV file with one field that is in JSON format {“a”: 3, “b”: “foo”, “c”: 1.2}. Unbox will reformat the JSON string into three distinct fields: an int, a string, and a double. The Unbox transformation is commonly used to replace costly Python User Defined Functions required to reformat data that may result in Apache Spark out of memory exceptions. The following example shows how to use Unbox:
    df_result = df_json.unbox('json', "json")

  3. ResolveChoice: AWS Glue Dynamic Frames support data where a column can have fields with different types. These columns are represented with Dynamic Frame’s choice type. For example, Dynamic Frame schema for the medicare dataset shows up as follows:
    root
     |-- drg definition: string
     |-- provider id: choice
     |    |-- long
     |    |-- string
     |-- provider name: string
     |-- provider street address: string

    This is because the “provider id” column could either be a long or string type. The Apache Spark Dataframe considers the whole dataset and is forced to cast it to the most general type, namely string. Dynamic Frames allow you to cast the type using the ResolveChoice transform. For example, you can cast the column to long type as follows.

    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
    
    medicare_res.printSchema()
     
    root
     |-- drg definition: string
     |-- provider id: long
     |-- provider name: string
     |-- provider street address: string

    This transform would also insert a null where the value was a string that could not be cast. As a result, the records with string type casted to null values can also be identified now. Alternatively, the choice type can also be cast to struct, which keeps values of both types.

Build and orchestrate data pipelines using AWS Glue Workflows

AWS Glue Workflows provide a visual tool to author data pipelines by combining Glue crawlers for schema discovery, and Glue Spark and Python jobs to transform the data. Relationships can be defined and parameters passed between task nodes to enable users to build pipelines of varying complexity. Workflows can be scheduled to run on a schedule or triggered programmatically. You can track the progress of each node independently or the entire workflow making it easier to troubleshoot your pipelines.

A typical workflow for ETL workloads is organized as follows:

  1. Glue Python command triggered manually, on a schedule, or on an external CloudWatch event. It would pre-process or list the partitions in Amazon S3 for a table under a base location. For example, a CloudTrail logs partition to process could be: s3://AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/DAY/HOUR/.The Python command can list all the regions and schedule crawlers to create different Glue Data Catalog tables on each region.
  2. Glue Crawlers triggered next to populate new partitions for every hour in Glue Data Catalog for recently ingested in Amazon S3.
  3. Concurrent Glue ETL jobs triggered to separately filter and process each partition or a group of partitions. For example, CloudTrail events corresponding to the last week can be read by a Glue ETL job by passing in the partition prefix as Glue job parameters and using Glue ETL push down predicates to just read all the partitions in that prefix.Partitioning and orchestrating concurrent Glue ETL jobs allows you to scale and reliably execute individual Apache Spark applications by processing only a subset of partitions in the Glue Data Catalog table. The transformed data can then be concurrently written back by all individual Glue ETL jobs to a common target table in Amazon S3 data lake, AWS Redshift or other databases.

Finally, a Glue Python command can be triggered to capture the completion status of the different Glue entities including Glue Crawlers, parallel Glue ETL jobs; and post-process or retry any failed components.

Executing SQL using SparkSQL in AWS Glue

AWS Glue Data Catalog as Hive Compatible Metastore

The AWS Glue Data Catalog is a managed metadata repository compatible with the Apache Hive Metastore API. You can follow the detailed instructions here to configure your AWS Glue ETL jobs and development endpoints to use the Glue Data Catalog. You also need to add the Hive SerDes to the class path of AWS Glue Jobs to serialize/deserialize data for the corresponding formats. You can then natively run Apache Spark SQL queries against your tables stored in the Data Catalog.

The following example assumes that you have crawled the US legislators dataset available at s3://awsglue-datasets/examples/us-legislators. We’ll use the Spark shell running on AWS Glue developer endpoint to execute SparkSQL queries directly on the legislators’ tables cataloged in the AWS Glue Data Catalog.

>>> spark.sql("use legislators")
DataFrame[]
>>> spark.sql("show tables").show()
+-----------+------------------+-----------+
|   database|         tableName|isTemporary|
+-----------+------------------+-----------+
|legislators|        areas_json|      false|
|legislators|    countries_json|      false|
|legislators|       events_json|      false|
|legislators|  memberships_json|      false|
|legislators|organizations_json|      false|
|legislators|      persons_json|      false|

>>> spark.sql("select distinct organization_id from memberships_json").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

A similar approach to the above would be to use AWS Glue DynamicFrame API to read the data from S3. The DynamicFrame is then converted to a Spark DataFrame using the toDF method. Next, a temporary view can be registered for DataFrame, which can be queried using SparkSQL. The key difference between the two approaches is the use of Hive SerDes for the first approach, and native Glue/Spark readers for the second approach. The use of native Glue/Spark provides the performance and flexibility benefits such as computation of the schema at runtime, schema evolution, and job bookmarks support for Glue Dynamic Frames.

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
>>> memberships.toDF().createOrReplaceTempView("memberships")
>>> spark.sql("select distinct organization_id from memberships").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

Workflows and S3 Consistency

If you have a workflow of external processes ingesting data into S3, or upstream AWS Glue jobs generating input for a table used by downstream jobs in a workflow, you can encounter the following Apache Spark errors.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 16.0 failed 4 times, most recent failure: Lost task 10.3 in stage 16.0 (TID 761, ip-<>.ec2.internal, executor 1): 
java.io.FileNotFoundException: No such file or directory 's3://<bucket>/fileprefix-c000.snappy.parquet'
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 
'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

These errors happen when the upstream jobs overwrite to the same S3 objects that the downstream jobs are concurrently listing or reading. This can also happen due to eventual consistency of S3 resulting in overwritten or deleted objects get updated at a later time when the downstream jobs are reading. A common manifestation of this error occurs when you are create a SparkSQL view and execute SQL queries in the downstream job. To avoid these errors, the best practice is to set up a workflow with upstream and downstream jobs scheduled at different times, and read/write to different S3 partitions based on time.

You can also enable the S3-optimized output committer for your Glue jobs by passing in a special job parameter: “–enable-s3-parquet-optimized-committer” set to true. This committer improves application performance by avoiding list and rename operations in Amazon S3 during job and task commit phases. It also avoids issues that can occur with Amazon S3’s eventual consistency during job and task commit phases, and helps to minimize task failures.

Conclusion

In this post, we discussed how to leverage the automatic code generation process in AWS Glue ETL to simplify common data manipulation tasks such as data type conversion and flattening complex structures. We also explored using AWS Glue Workflows to build and orchestrate data pipelines of varying complexity. Lastly, we looked at how you can leverage the power of SQL, with the use of AWS Glue ETL and Glue Data Catalog, to query and transform your data.

In the final post, we will explore specific capabilities in AWS Glue and best practices to help you better manage the performance, scalability and operation of AWS Glue Apache Spark jobs.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Best practices to scale Apache Spark jobs and partition data with AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. This series of posts discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

The first post of this series discusses two key AWS Glue capabilities to manage the scaling of data processing jobs. The first allows you to horizontally scale out Apache Spark applications for large splittable datasets. The second allows you to vertically scale up memory-intensive Apache Spark applications with the help of new AWS Glue worker types. The post also shows how to use AWS Glue to scale Apache Spark applications with a large number of small files commonly ingested from streaming applications using Amazon Kinesis Data Firehose. Finally, the post shows how AWS Glue jobs can use the partitioning structure of large datasets in Amazon S3 to provide faster execution times for Apache Spark applications.

Understanding AWS Glue worker types

AWS Glue comes with three worker types to help customers select the configuration that meets their job latency and cost requirements. These workers, also known as Data Processing Units (DPUs), come in Standard, G.1X, and G.2X configurations.

The standard worker configuration allocates 5 GB for Spark driver and executor memory, 512 MB for spark.yarn.executor.memoryOverhead, and 50 GB of attached EBS storage. The G.1X worker allocates 10 GB for driver and executor memory, 2 GB memoryOverhead, and 64 GB of attached EBS storage. The G.2X worker allocates 20 GB for driver and executor memory, 4 GB memoryOverhead, and 128 GB of attached EBS storage.

The compute parallelism (Apache Spark tasks per DPU) available for horizontal scaling is the same regardless of the worker type. For example, both standard and G1.X workers map to 1 DPU, each of which can run eight concurrent tasks. A G2.X worker maps to 2 DPUs, which can run 16 concurrent tasks. As a result, compute-intensive AWS Glue jobs that possess a high degree of data parallelism can benefit from horizontal scaling (more standard or G1.X workers). AWS Glue jobs that need high memory or ample disk space to store intermediate shuffle output can benefit from vertical scaling (more G1.X or G2.x workers).

Horizontal scaling for splittable datasets

AWS Glue automatically supports file splitting when reading common native formats (such as CSV and JSON) and modern file formats (such as Parquet and ORC) from S3 using AWS Glue DynamicFrames. For more information about DynamicFrames, see Work with partitioned data in AWS Glue.

A file split is a portion of a file that a Spark task can read and process independently on an AWS Glue worker. By default, file splitting is enabled for line-delimited native formats, which allows Apache Spark jobs running on AWS Glue to parallelize computation across multiple executors. AWS Glue jobs that process large splittable datasets with medium (hundreds of megabytes) or large (several gigabytes) file sizes can benefit from horizontal scaling and run faster by adding more AWS Glue workers.

File splitting also benefits block-based compression formats such as bzip2. You can read each compression block on a file split boundary and process them independently. Unsplittable compression formats such as gzip do not benefit from file splitting. To horizontally scale jobs that read unsplittable files or compression formats, prepare the input datasets with multiple medium-sized files.

 

Each file split (the blue square in the figure) is read from S3, deserialized into an AWS Glue DynamicFrame partition, and then processed by an Apache Spark task (the gear icon in the figure). Deserialized partition sizes can be significantly larger than the on-disk 64 MB file split size, especially for highly compressed splittable file formats such as Parquet or large files using unsplittable compression formats such as gzip. Typically, a deserialized partition is not cached in memory, and only constructed when needed due to Apache Spark’s lazy evaluation of transformations, thus not causing any memory pressure on AWS Glue workers. For more information on lazy evaluation, see the RDD Programming Guide on the Apache Spark website.

However, explicitly caching a partition in memory or spilling it out to local disk in an AWS Glue ETL script or Apache Spark application can result in out-of-memory (OOM) or out-of-disk exceptions. AWS Glue can support such use cases by using larger AWS Glue worker types with vertically scaled-up DPU instances for AWS Glue ETL jobs.

Vertical scaling for Apache Spark jobs using larger worker types

A variety of AWS Glue ETL jobs, Apache Spark applications, and new machine learning (ML) Glue transformations supported with AWS Lake Formation have high memory and disk requirements. Running these workloads may put significant memory pressure on the execution engine. This memory pressure can result in job failures because of OOM or out-of-disk space exceptions. You may see exceptions from Yarn about memory and disk space.

Exceeding Yarn memory overhead

Apache Yarn is responsible for allocating cluster resources needed to run your Spark application. An application includes a Spark driver and multiple executor JVMs. In addition to the memory allocation required to run a job for each executor, Yarn also allocates an extra overhead memory to accommodate for JVM overhead, interned strings, and other metadata that the JVM needs. The configuration parameter spark.yarn.executor.memoryOverhead defaults to 10% of the total executor memory. Memory-intensive operations such as joining large tables or processing datasets with a skew in the distribution of specific column values may exceed the memory threshold, and result in the following error message:

18/06/13 16:54:29 ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

Disk space

Apache Spark uses local disk on Glue workers to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. During the sort or shuffle stages of a job, Spark writes intermediate data to local disk before it can exchange that data between the different workers. Jobs may fail due to the following exception when no disk space remains:

java.io.IOException: No space left on device
UnsafeExternalSorter: Thread 20 spilling sort data of 141.0 MB to disk (90 times so far)

AWS Glue job metrics

Most commonly, this is a result of a significant skew in the dataset that the job is processing. You can also identify the skew by monitoring the execution timeline of different Apache Spark executors using AWS Glue job metrics. For more information, see Debugging Demanding Stages and Straggler Tasks.

The following AWS Glue job metrics graph shows the execution timeline and memory profile of different executors in an AWS Glue ETL job. One of the executors (the red line) is straggling due to processing of a large partition, and actively consumes memory for the majority of the job’s duration.

With AWS Glue’s Vertical Scaling feature, memory-intensive Apache Spark jobs can use AWS Glue workers with higher memory and larger disk space to help overcome these two common failures. Using AWS Glue job metrics, you can also debug OOM and determine the ideal worker type for your job by inspecting the memory usage of the driver and executors for a running job. For more information, see Debugging OOM Exceptions and Job Abnormalities.

In general, jobs that run memory-intensive operations can benefit from the G1.X worker type, and those that use AWS Glue’s ML transforms or similar ML workloads can benefit from the G2.X worker type.

Apache Spark UI for AWS Glue jobs

You can also use AWS Glue’s support for Spark UI to inpect and scale your AWS Glue ETL job by visualizing the Directed Acyclic Graph (DAG) of Spark’s execution, and also monitor demanding stages, large shuffles, and inspect Spark SQL query plans. For more information, see Monitoring Jobs Using the Apache Spark Web UI.

The following Spark SQL query plan on the Spark UI shows the DAG for an ETL job that reads two tables from S3, performs an outer-join that results in a Spark shuffle, and writes the result to S3 in Parquet format.

As seen from the plan, the Spark shuffle and subsequent sort operation for the join transformation takes the majority of the job execution time. With AWS Glue vertical scaling, each AWS Glue worker co-locates more Spark tasks, thereby saving on the number of data exchanges over the network.

Scaling to handle large numbers of small files

An AWS Glue ETL job might read thousands or millions of files from S3. This is typical for Kinesis Data Firehose or streaming applications writing data into S3. The Apache Spark driver may run out of memory when attempting to read a large number of files. When this happens, you see the following error message:

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 12039"...

Apache Spark v2.2 can manage approximately 650,000 files on the standard AWS Glue worker type. To handle more files, AWS Glue provides the option to read input files in larger groups per Spark task for each AWS Glue worker. For more information, see Reading Input Files in Larger Groups.

You can reduce the excessive parallelism from the launch of one Apache Spark task to process each file by using AWS Glue file grouping. This method reduces the chances of an OOM exception on the Spark driver. To configure file grouping, you need to set groupFiles and groupSize parameters. The following code example uses AWS Glue DynamicFrame API in an ETL script with these parameters:

dyf = glueContext.create_dynamic_frame_from_options("s3",
    {'paths': ["s3://input-s3-path/"],
    'recurse':True,
    'groupFiles': 'inPartition',
    'groupSize': '1048576'}, 
    format="json")

You can set groupFiles to group files within a Hive-style S3 partition (inPartition) or across S3 partitions (acrossPartition). In most scenarios, grouping within a partition is sufficient to reduce the number of concurrent Spark tasks and the memory footprint of the Spark driver. In benchmarks, AWS Glue ETL jobs configured with the inPartition grouping option were approximately seven times faster than native Apache Spark v2.2 when processing 320,000 small JSON files distributed across 160 different S3 partitions. A large fraction of the time in Apache Spark is spent building an in-memory index while listing S3 files and scheduling a large number of short-running tasks to process each file. With AWS Glue grouping enabled, the benchmark AWS Glue ETL job could process more than 1 million files using the standard AWS Glue worker type.

groupSize is an optional field that allows you to configure the amount of data each Spark task reads and processes as a single AWS Glue DynamicFrame partition. Users can set groupSize if they know the distribution of file sizes before running the job. The groupSize parameter allows you to control the number of AWS Glue DynamicFrame partitions, which also translates into the number of output files. However, using a considerably small or large groupSize can result in significant task parallelism or under-utilization of the cluster, respectively.

By default, AWS Glue automatically enables grouping without any manual configuration when the number of input files or task parallelism exceeds a threshold of 50,000. The default value of the groupFiles parameter is inPartition, so that each Spark task only reads files within the same S3 partition. AWS Glue computes the groupSize parameter automatically and configures it to reduce the excessive parallelism, and makes use of the cluster compute resources with sufficient Spark tasks running in parallel.

Partitioning data and pushdown predicates

Partitioning has emerged as an important technique for organizing datasets so that a variety of big data systems can query them efficiently. A hierarchical directory structure organizes the data, based on the distinct values of one or more columns. For example, you can partition your application logs in S3 by date, broken down by year, month, and day. Files corresponding to a single day’s worth of data receive a prefix such as the following:

s3://my_bucket/logs/year=2018/month=01/day=23/

Predicate pushdowns for partition columns

AWS Glue supports pushing down predicates, which define a filter criteria for partition columns populated for a table in the AWS Glue Data Catalog. Instead of reading all the data and filtering results at execution time, you can supply a SQL predicate in the form of a WHERE clause on the partition column. For example, assume the table is partitioned by the year column and run SELECT * FROM table WHERE year = 2019. year represents the partition column and 2019 represents the filter criteria.

AWS Glue lists and reads only the files from S3 partitions that satisfy the predicate and are necessary for processing.

To accomplish this, specify a predicate using the Spark SQL expression language as an additional parameter to the AWS Glue DynamicFrame getCatalogSource method. This predicate can be any SQL expression or user-defined function that evaluates to a Boolean, as long as it uses only the partition columns for filtering.

This example demonstrates this functionality with a dataset of Github events partitioned by year, month, and day. The following code example reads only those S3 partitions related to events that occurred on weekends:

%spark

val partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

Here you can use the SparkSQL string concat function to construct a date string. The to_date function converts it to a date object, and the date_format function with the ‘E’ pattern converts the date to a three-character day of the week (for example, Mon or Tue). For more information about these functions, Spark SQL expressions, and user-defined functions in general, see the Spark SQL, DataFrames and Datasets Guide and list of functions on the Apache Spark website.

There is a significant performance boost for AWS Glue ETL jobs when pruning AWS Glue Data Catalog partitions. It reduces the time needed for the Spark query engine for listing files in S3 and reading and processing data at runtime. You can achieve further improvement as you exclude additional partitions by using predicates with higher selectivity.

Partitioning data before and during writes to S3

By default, data is not partitioned when writing out the results from an AWS Glue DynamicFrame—all output files are written at the top level under the specified output path. AWS Glue enables partitioning of DynamicFrame results by passing the partitionKeys option when creating a sink. For example, the following code example writes out the dataset in Parquet format to S3 partitioned by the type column:

%spark

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))),
    format = "parquet").writeDynamicFrame(projectedEvents)

In this example, $outpath is a placeholder for the base output path in S3. The partitionKeys parameter corresponds to the names of the columns used to partition the output in S3. When you execute the write operation, it removes the type column from the individual records and encodes it in the directory structure. To demonstrate this, you can list the output path using the following aws s3 ls command from the AWS CLI:

PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/

For more information, see aws . s3 . ls in the AWS CLI Command Reference.

In general, you should select columns for partitionKeys that are of lower cardinality and are most commonly used to filter or group query results. For example, when analyzing AWS CloudTrail logs, it is common to look for events that happened between a range of dates. Therefore, partitioning the CloudTrail data by year, month, and day would improve query performance and reduce the amount of data that you need to scan to return the answer.

The benefit of output partitioning is two-fold. First, it improves execution time for end-user queries. Second, having an appropriate partitioning scheme helps avoid costly Spark shuffle operations in downstream AWS Glue ETL jobs when combining multiple jobs into a data pipeline. For more information, see Working with partitioned data in AWS Glue.

S3 or Hive-style partitions are different from Spark RDD or DynamicFrame partitions. Spark partitioning is related to how Spark or AWS Glue breaks up a large dataset into smaller and more manageable chunks to read and apply transformations in parallel. AWS Glue workers manage this type of partitioning in memory. You can control Spark partitions further by using the repartition or coalesce functions on DynamicFrames at any point during a job’s execution and before data is written to S3. You can set the number of partitions using the repartition function either by explicitly specifying the total number of partitions or by selecting the columns to partition the data.

Repartitioning a dataset by using the repartition or coalesce functions often results in AWS Glue workers exchanging (shuffling) data, which can impact job runtime and increase memory pressure. In contrast, writing data to S3 with Hive-style partitioning does not require any data shuffle and only sorts it locally on each of the worker nodes. The number of output files in S3 without Hive-style partitioning roughly corresponds to the number of Spark partitions. In contrast, the number of output files in S3 with Hive-style partitioning can vary based on the distribution of partition keys on each AWS Glue worker.

Conclusion

This post showed how to scale your ETL jobs and Apache Spark applications on AWS Glue for both compute and memory-intensive jobs. AWS Glue enables faster job execution times and efficient memory management by using the parallelism of the dataset and different types of AWS Glue workers. It also helps you overcome the challenges of processing many small files by automatically adjusting the parallelism of the workload and cluster. AWS Glue ETL jobs use the AWS Glue Data Catalog and enable seamless partition pruning using predicate pushdowns. It also allows for efficient partitioning of datasets in S3 for faster queries by downstream Apache Spark applications and other analytics engines such as Amazon Athena and Amazon Redshift. We hope you try out these best practices for your Apache Spark applications on AWS Glue.

The second post in this series will show how to use AWS Glue features to batch process large historical datasets and incrementally process deltas in S3 data lakes. It also demonstrates how to use a custom AWS Glue Parquet writer for faster job execution.

 


About the Author

Mohit Saxena is a technical lead at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.