All posts by Sekar Srinivasan

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.

Monitoring

The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.

Outcomes

The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.

Conclusion

In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.


About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

Run Apache Spark workloads 3.5 times faster with Amazon EMR 6.9

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/run-apache-spark-workloads-3-5-times-faster-with-amazon-emr-6-9/

The Amazon EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark. With Amazon EMR release 6.9.0, the EMR runtime for Apache Spark supports equivalent Spark version 3.3.0.

With Amazon EMR 6.9.0, you can now run your Apache Spark 3.x applications faster and at lower cost without requiring any changes to your applications. In our performance benchmark tests, derived from TPC-DS performance tests at 3 TB scale, we found the EMR runtime for Apache Spark 3.3.0 provides a 3.5 times (using total runtime) performance improvement on average over open-source Apache Spark 3.3.0.

In this post, we analyze the results from our benchmark tests running a TPC-DS application on open-source Apache Spark and then on Amazon EMR 6.9, which comes with an optimized Spark runtime that is compatible with open-source Spark. We walk through a detailed cost analysis and finally provide step-by-step instructions to run the benchmark.

Results observed

To evaluate the performance improvements, we used an open-source Spark performance test utility that is derived from the TPC-DS performance test toolkit. We ran the tests on a seven-node (six core nodes and one primary node) c5d.9xlarge EMR cluster with the EMR runtime for Apache Spark, and a second seven-node self-managed cluster on Amazon Elastic Compute Cloud (Amazon EC2) with the equivalent open-source version of Spark. We ran both the tests with data in Amazon Simple Storage Service (Amazon S3).

Dynamic Resource Allocation (DRA) is a great feature to use for varying workloads. However, for a benchmarking exercise where we compare two platforms purely on performance, and test data volumes don’t change (3 TB in our case), we believe it’s best to avoid variability in order to run an apples-to-apples comparison. In our tests in both open-source Spark and Amazon EMR, we disabled DRA while running the benchmarking application.

The following table shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between Amazon EMR version 6.9.0 and open-source Spark version 3.3.0. We observed that our TPC-DS tests had a total job runtime on Amazon EMR on Amazon EC2 that was 3.5 times faster than that using an open-source Spark cluster of the same configuration.

The per-query speedup on Amazon EMR 6.9 with and without the EMR runtime for Apache Spark is illustrated in the following chart. The horizontal axis shows each query in the 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. Notable performance gains are over 10 times faster for TPC-DS queries 24b, 72, 95, and 96.

Cost analysis

The performance improvements of the EMR runtime for Apache Spark directly translate to lower costs. We were able to realize a 67% cost savings running the benchmark application on Amazon EMR in comparison with the cost incurred to run the same application on open-source Spark on Amazon EC2 with the same cluster sizing due to reduced hours of Amazon EMR and Amazon EC2 usage. Amazon EMR pricing is for EMR applications running on EMR clusters with EC2 instances. The Amazon EMR price is added to the underlying compute and storage prices such as EC2 instance price and Amazon Elastic Block Store (Amazon EBS) cost (if attaching EBS volumes). Overall, the estimated benchmark cost in the US East (N. Virginia) Region is $27.01 per run for the open-source Spark on Amazon EC2 and $8.82 per run for Amazon EMR.

Benchmark Job Runtime (Hour) Estimated Cost Total EC2 Instance Total vCPU Total Memory (GiB) Root Device (Amazon EBS)

Open-source Spark on Amazon EC2

(1 primary and 6 core nodes)

2.23 $27.01 7 252 504 20 GiB gp2

Amazon EMR on Amazon EC2

(1 primary and 6 core nodes)

0.63 $8.82 7 252 504 20 GiB gp2

Cost breakdown

The following is the cost breakdown for the open-source Spark on Amazon EC2 job ($27.01):

  • Total Amazon EC2 cost – (7 * $1.728 * 2.23) = (number of instances * c5d.9xlarge hourly rate * job runtime in hour) = $26.97
  • Amazon EBS cost – ($0.1/730 * 20 * 7 * 2.23) = (Amazon EBS per GB-hourly rate * root EBS size * number of instances * job runtime in hour) = $0.042

The following is the cost breakdown for the Amazon EMR on Amazon EC2 job ($8.82):

  • Total Amazon EMR cost – (7 * $0.27 * 0.63) = ((number of core nodes + number of primary nodes)* c5d.9xlarge Amazon EMR price * job runtime in hour) = $1.19
  • Total Amazon EC2 cost – (7 * $1.728 * 0.63) = ((number of core nodes + number of primary nodes)* c5d.9xlarge instance price * job runtime in hour) = $7.62
  • Amazon EBS cost – ($0.1/730 * 20 GiB * 7 * 0.63) = (Amazon EBS per GB-hourly rate * EBS size * number of instances * job runtime in hour) = $0.012

Set up OSS Spark benchmarking

In the following sections, we provide a brief outline of the steps involved in setting up the benchmarking. For detailed instructions with examples, refer to the GitHub repo.

For our OSS Spark benchmarking, we use the open-source tool Flintrock to launch our Amazon EC2-based Apache Spark cluster. Flintrock provides a quick way to launch an Apache Spark cluster on Amazon EC2 using the command line.

Prerequisites

Complete the following prerequisite steps:

  1. Have Python 3.7.x or above.
  2. Have Pip3 22.2.2 or above.
  3. Add the Python bin directory to your environment path. The Flintrock binary will be installed in this path.
  4. Run aws configure to configure your AWS Command Line Interface (AWS CLI) shell to point to the benchmarking account. Refer to Quick configuration with aws configure for instructions.
  5. Have a key pair with restrictive file permissions to access the OSS Spark primary node.
  6. Create a new S3 bucket in your test account if needed.
  7. Copy the TPC-DS source data as input to your S3 bucket.
  8. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application. Alternatively, you can download a pre-built spark-benchmark-assembly-3.3.0.jar if you want a Spark 3.3.0-based application.

Deploy the Spark cluster and run the benchmark job

Complete the following steps:

  1. Install the Flintrock tool via pip as shown in Steps to setup OSS Spark Benchmarking.
  2. Run the command flintrock configure, which pops up a default configuration file.
  3. Modify the default config.yaml file based on your needs. Alternatively, copy and paste the config.yaml file content to the default configure file. Then save the file to where it was.
  4. Finally, launch the 7-node Spark cluster on Amazon EC2 via Flintrock.

This should create a Spark cluster with one primary node and six worker nodes. If you see any error messages, double-check the config file values, especially the Spark and Hadoop versions and the attributes of download-source and the AMI.

The OSS Spark cluster doesn’t come with YARN resource manager. To enable it, we need to configure the cluster.

  1. Download the yarn-site.xml and enable-yarn.sh files from the GitHub repo.
  2. Replace <private ip of primary node> with the IP address of the primary node in your Flintrock cluster.

You can retrieve the IP address from the Amazon EC2 console.

  1. Upload the files to all the nodes of the Spark cluster.
  2. Run the enable-yarn script.
  3. Enable Snappy support in Hadoop (the benchmark job reads Snappy compressed data).
  4. Download the benchmark utility application JAR file spark-benchmark-assembly-3.3.0.jar to your local machine.
  5. Copy this file to the cluster.
  6. Log in to the primary node and start YARN.
  7. Submit the benchmark job on the open-source Spark cluster as shown in Submit the benchmark job.

Summarize the results

Download the test result file from the output S3 bucket s3://$YOUR_S3_BUCKET/EC2_TPCDS-TEST-3T-RESULT/timestamp=xxxx/summary.csv/xxx.csv. (Replace $YOUR_S3_BUCKET with your S3 bucket name.) You can use the Amazon S3 console and navigate to the output S3 location or use the AWS CLI.

The Spark benchmark application creates a timestamp folder and writes a summary file inside a summary.csv prefix. Your timestamp and file name will be different from the one shown in the preceding example.

The output CSV files have four columns without header names. They are:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

The following screenshot shows a sample output. We have manually added column names. The way we calculate the geomean and the total job runtime is based on arithmetic means. We first take the mean of the med, min, and max values using the formula AVERAGE(B2:D2). Then we take a geometric mean of the Avg column using the formula GEOMEAN(E2:E105).

Set up Amazon EMR benchmarking

For detailed instructions, see Steps to setup EMR Benchmarking.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure your AWS CLI shell to point to the benchmarking account. Refer to Quick configuration with aws configure for instructions.
  2. Upload the benchmark application to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps:

  1. Spin up Amazon EMR in your AWS CLI shell using command line as shown in Deploy EMR Cluster and run benchmark job.
  2. Configure Amazon EMR with one primary (c5d.9xlarge) and six core (c5d.9xlarge) nodes. Refer to create-cluster for a detailed description of AWS CLI options.
  3. Store the cluster ID from the response. You need this in the next step.
  4. Submit the benchmark job in Amazon EMR using add-steps in the AWS CLI.

Summarize the results

Summarize the results from the output bucket s3://$YOUR_S3_BUCKET/blog/EMRONEC2_TPCDS-TEST-3T-RESULT in the same manner as we did for the OSS results and compare.

Clean up

To avoid incurring future charges, delete the resources you created using the instructions in the Cleanup section of the GitHub repo.

  1. Stop the EMR and OSS Spark clusters. You may also delete them if you don’t want to retain the content. You can delete these resources by running the script cleanup-benchmark-env.sh from a terminal in your benchmark environment.
  2. If you used AWS Cloud9 as your IDE for building the benchmark application JAR file using Steps to build spark-benchmark-assembly application, you may want to delete the environment as well.

Conclusion

You can run your Apache Spark workloads 3.5 times (based on total runtime) faster and at lower cost without making any changes to your applications by using Amazon EMR 6.9.0.

To keep up to date, subscribe to the Big Data Blog’s RSS feed to learn more about the EMR runtime for Apache Spark, configuration best practices, and tuning advice.

For past benchmark tests, see Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark. Note that the past benchmark result of 1.7 times performance was based on geometric mean. Based on geometric mean, the performance in Amazon EMR 6.9 was two times faster.


About the authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, especially those focused on underprivileged Children’s education.

Prabu Ravichandran is a Senior Data Architect with Amazon Web Services, focussed on Analytics, data Lake architecture and implementation. He helps customers architect and build scalable and robust solutions using AWS services. In his free time, Prabu enjoys traveling and spending time with family.

Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-high-performance-acid-compliant-evolving-data-lake-using-apache-iceberg-on-amazon-emr/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto.

Apache Iceberg is an open table format for huge analytic datasets. Table formats typically indicate the format and location of individual table files. Iceberg adds functionality on top of that to help manage petabyte-scale datasets as well as newer data lake requirements such as transactions, upsert/merge, time travel, and schema and partition evolution. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table.

Amazon EMR release 6.5.0 and later includes Apache Iceberg so you can reliably work with huge tables with full support for ACID (Atomic, Consistent, Isolated, Durable) transactions in a highly concurrent and performant manner without getting locked into a single file format.

In this post, we discuss the modern data lake requirements and the challenges—including support for ACID transactions and concurrent writers, partition and schema evolution—that come with these. We also discuss how Iceberg solves these challenges. Additionally, we provide a step-by-step guide on how to get started with an Iceberg notebook in Amazon EMR Studio. You can access this sample notebook from the GitHub repo. You can also find this notebook in your EMR Studio workspace under Notebook Examples.

Modern data lake challenges

Amazon EMR integrates with Amazon Simple Storage Service (Amazon S3) natively for persistent data storage, and allows you to independently scale your data in Amazon S3 and compute on your EMR cluster. This enables you to bring in data from multiple sources (for example, transactional data from operational databases, social media feeds, and SaaS data sources) using different tools, and each data source has its own transient EMR cluster to perform transformation and ingestion in parallel. You can now keep one central copy of your data and share it with multiple user groups that run analytics and even make in-place updates on a data lake. We’re increasingly seeing the following requirements (and challenges) emerge as mainstream:

  • Consistent reads and writes across multiple concurrent users – There are two primary concerns:
    • Reader-writer isolation – When a job is updating a huge dataset, another job accessing the same data frequently works on a partially updated dataset, leaving the data in an inconsistent state.
    • Concurrent writes on the same dataset – Table formats relying on coarse-grained locks slow down the system. This limitation is even more telling in real-time streaming workloads.
  • Consistent table updates across multiple files or partitions – With Hive tables, writing to multiple partitions at once isn’t an atomic operation. If you’re overwriting a partition, for instance, you might delete several files across partitions without having any guarantees that you will replace them, potentially resulting in data loss. For huge tables, it’s not practical to use global locks and keep the readers and writers waiting. Common workarounds (such as rewriting all the data in all the partitions that need to be changed at the same time and then pointing to the new locations) cause huge data duplication and redundant extract, transform, and load (ETL) jobs.
  • Continuous schema evolution – Simple DDL commands often render the data unusable. For instance, say a data engineer renames a column and writes some data. The consuming analytics tool now can’t read it because the metastore can’t track former names for columns. That rename operation has effectively dropped a column and added a new column. Now there is data written in both schemas. Historically, schema changes required expensive backfills and redundant ETL processes.
  • Different query patterns on the same data – If you change the partitioning to optimize your query after a year, say from daily to hourly, you have to rewrite the table with the new hour column as the partition. In addition, you have to rewrite queries to use the new partition column in your table.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Existing tools that support these features lock you into specific file formats, complicating interoperability across the analytics ecosystem.
  • Support for mixed file formats – With existing solutions, if you rename a column in one file format (say Parquet, ORC, or Avro), you get a different behavior than if you rename a column in a different file format. There is inconsistency in data types supported by different file formats. These limitations necessitate additional ETL steps.

The problem

When multiple users share the same data, varied requirements ensue. The data platform needs to be transactional to handle concurrent upserts and reads.

Table formats such as Hive track a list of partitions inside the table within a data catalog. However, the underlying files are still not tracked transactionally, because we’re relying on an immutable object storage that is just not designed to be transactional. After the specific partitions to be updated or inserted have been identified, we still need to list all the files in those partitions at the leaf level of the partition hierarchy before we can filter out which of those files are relevant. For huge analytic datasets with thousands of files in each partition, listing all those files each time you run a query slows it down considerably. Furthermore, doing atomic commits—getting thousands of files in the table live in exactly the same moment—becomes impractical.

Apache Iceberg on Amazon EMR

Iceberg development was started by Netflix in December 2017 and was donated to the Apache software foundation in November 2018 as an incubator project. In May 2020, it graduated from the incubator.

Iceberg on Amazon EMR comes completely integrated and tested for running in production backed by Enterprise Support. This means you get 24/7 technical support from Amazon EMR experts, tools and technology to automatically manage the health of your environment, and consultative architectural, performance, and troubleshooting guidance on Iceberg issues.

Iceberg has integrations with other AWS services. For example, you can use the AWS Glue Data Catalog as the metastore for Iceberg tables. Iceberg also supports other catalog types such as Hive, Hadoop, Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and other custom implementations. When using AWS Glue as the data catalog, the AWS Glue database serves as your Iceberg namespace. Similarly, the AWS Glue table and AWS Glue TableVersion serve as the Iceberg table and table version, respectively. Your AWS Glue Data Catalog could be in the same or different account or even a different Region, making multi-account, multi-Region pipelines easily deployable. Amazon Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

How Iceberg addresses these challenges

Iceberg tracks individual data files in a table instead of simply maintaining a pointer to high-level table or partition locations. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time new datasets are ingested into this table, a new point-in-time snapshot gets created. At query time, there is no need to list a directory to find the files we need to work with, because the snapshot already has that information pre-populated during the write time. Because of this design, Iceberg solves the problems listed earlier in the following ways:

  • Consistent reads and writes across multiple concurrent users – Iceberg relies on optimistic concurrency to support concurrent reads and writes from multiple user groups. If two operations are running at the same time, only one of them will be successful. The other job will retry, but that retry will be implicit to the user and that will be done at the metadata level. If Iceberg detects that the second update is not in conflict, it will commit it successfully.
  • Consistent table updates across multiple partitions – In Iceberg, the partition of a file isn’t determined by the physical location of the files within directories or prefixes. Instead, Iceberg stores partition information within manifests of the data files. Therefore, updates across multiple partitions entail a simple, atomic metadata change.
  • Continuous schema evolution – Iceberg tracks columns by using unique IDs and not by the column name, which enables easy schema evolution. You can safely add, drop, rename, or even reorder columns. You can also update column data types if the update is safe (such as widening from INT to BIGINT or float to double)
  • Different query patterns on the same data – Iceberg keeps track of the relationship between partitioning values and the column that they came from. Logical data is decoupled from physical layout, which enables easy partition evolution as well. Partition values can be implicitly derived using a transform such as day(timestamp) or hour(timestamp) of an existing column.
  • ACID transactions, streaming upserts, file size optimization, and data snapshots – Iceberg supports ACID transactions with serializable isolation. Furthermore, Iceberg supports deletes, upserts, change data capture (CDC), time travel (getting the state of the data from a past time regardless of the current state of the data), and compaction (consolidating small files into larger files to reduce metadata overhead and improve query speed). Table changes are atomic, and readers never see partial or uncommitted changes.
  • Support for mixed file formats – Because schema fields are tracked by unique IDs independent of the underlying file format, you can have consistent queries across file formats such as Avro, Parquet, and ORC.

Using Apache Iceberg with Amazon EMR

In this post, we demonstrate creating an Amazon EMR cluster that supports Iceberg using the AWS Command Line Interface (AWS CLI). You can also create the cluster from the Amazon EMR console. We use Amazon EMR Studio to run notebook code on our EMR cluster. To set up an EMR Studio, refer to Set up an EMR Studio. First, we note down the subnets that we specified when we created our EMR Studio. Now we launch our EMR cluster using the AWS CLI:

aws emr create-cluster \
--name iceberg-emr-cluster \
--use-default-roles \
--release-label emr-6.6.0 \
--instance-count 1 \
--instance-type r5.4xlarge \
--applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
--ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
--configurations '[{"Classification":"iceberg-
defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-
site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.met
astore.AWSGlueDataCatalogHiveClientFactory"}}]'

We choose emr-6.6.0 as the release label. This release comes with Iceberg version 0.13.1 pre-installed. We launch a single-node EMR cluster with the instance type R5.4xlarge and with the following applications installed: Hadoop, Spark, Livy, and Jupyter Enterprise Gateway. Make sure that you replace <EMR-STUDIO-SUBNET> with a subnet ID from the list of EMR Studio’s subnets you noted earlier. We need to enable Iceberg and the AWS Glue Data Catalog on our cluster. To do this, we use the following configuration classifications:

[
  {
    "Classification": "iceberg-defaults ",
    "Properties": {
      "iceberg.enabled":"true"
    }
  },
  {
    "Classification": "spark-hive-site ",
    "Properties": {
      "hive.metastore.client.factory.class":        
         "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]

Initial setup

Let’s first create an S3 bucket location in the same Region as the EMR cluster to save a sample dataset that we’re going to create and work with. In this post, we use the placeholder bucket name YOUR-BUCKET-NAME. Remember to replace this with a globally unique bucket name when testing this out in your environment. From our EMR Studio workspace, we attach our cluster and use the PySpark kernel.

You can upload the sample notebook from the GitHub repo or use the Iceberg example under Notebook Examples in your own EMR Studio workspace and run the cells following the instructions in the notebook.

Configure a Spark session

In this command, we set our AWS Glue Data Catalog name as glue_catalog1. You can replace it with a different name. But if you do so, remember to change the Data Catalog name throughout this example, because we use the fully qualified table name including the Data Catalog name in all of our commands going forward. In the following command, remember to replace YOUR-BUCKET-NAME with your own bucket name:

%%configure -f
{
    "conf":  {
             "spark.sql.catalog.glue_catalog1": "org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse": 
                   "s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":    "org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table": "myGlueLockTable",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}

Let’s assume that the name of your catalog is glue_catalog1. The preceding code has the following components:

  • glue_catalog1.warehouse points to the Amazon S3 path where you want to store your data and metadata.
  • To make the catalog an AWS Glue Data Catalog, set glue_catalog1.catalog-impl to org.apache.iceberg.aws.glue.GlueCatalog. This key is required to point to an implementation class for any custom catalog implementation.
  • Use org.apache.iceberg.aws.s3.S3FileIO as the glue_catalog1.io-impl in order to take advantage of Amazon S3 multipart upload for high parallelism.
  • We use an Amazon DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that, we set lock-impl for our catalog to org.apache.iceberg.aws.glue.DynamoLockManager and we set lock.table to myGlueLockTable as the table name so that for every commit, the Data Catalog first obtains a lock using this table and then tries to safely modify the AWS Glue table. If you choose this option, the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Furthermore, additional DynamoDB charges apply.

Now that you’re all set with your EMR cluster for compute, S3 bucket for data, and AWS Glue Data Catalog for metadata, you can start creating a table and running the DML statements.

For all commands going forward, we use the %%sql cell magic to run Spark SQL commands in our EMR Studio notebook. However, for brevity, we don’t show the cell magic command. But you may need to use that in your Studio notebook for the SQL commands to work.

Create an Iceberg table in the AWS Glue Data Catalog

The default catalog is the AwsDataCatalog. Let’s switch to our AWS Glue catalog glue_catalog1, which has support for Iceberg tables. There are no namespaces as yet. A namespace in Iceberg is the same thing as a database in AWS Glue.

%%sql
use glue_catalog1

Let’s create a table called orders. The DDL syntax looks the same as creating a Hive table, for example, except that we include USING iceberg:

CREATE TABLE glue_catalog1.salesdb.orders
    (
      order_id              int,
      product_name          string,
      product_category      string,
      qty                   int,
      unit_price            decimal(7,2),
      order_datetime        timestamp
    )
USING iceberg
PARTITIONED BY (days(order_datetime))

Note that we’re also partitioning this table by extracting the day out of the order_datetime column. We don’t have to create a separate column for the partition.

DML statements

We then insert records to our table. Here is an example:

INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        1, 
        'Harry Potter and the Prisoner of Azkaban',
        'Books',
        2,
        7.99,
        current_timestamp()
    )

DML statements result in snapshots getting created. Note the snapshot_id and the timestamp column called committed_at:

SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

We now insert four more records and then query the orders table and confirm that the five records are present:

SELECT * FROM glue_catalog1.salesdb.orders

Querying from Athena

Because Iceberg on Amazon EMR comes pre-integrated with the AWS Glue Data Catalog, we can now query the Iceberg tables from AWS analytics services that support Iceberg. Let’s query the salesdb/orders table from Athena as shown in the following screenshot.

Upserts

The notebook then gives examples for updates and deletes, and even upserts. We use the MERGE INTO statement for upserts, which uses the source table orders_update with new and updated records:

MERGE INTO glue_catalog1.salesdb.orders target 
USING glue_catalog1.salesdb.orders_update source          
ON target.order_id = source.order_id              
WHEN MATCHED THEN 
    UPDATE SET
        order_id = source.order_id,
        product_name = source.product_name,
        product_category = source.product_category,
        qty = source.qty,
        unit_price = source.unit_price,
        order_datetime = source.order_datetime
WHEN NOT MATCHED THEN
    INSERT *
select * from glue_catalog1.salesdb.orders;

Schema evolution

We then walk through schema evolution using simple ALTER TABLE commands to add, rename, and drop columns. The following example how simple it is to rename a column:

ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity
DESC table glue_catalog1.salesdb.orders

Time travel

Iceberg also allows us to travel backward or forward by storing point-in-time snapshots. We can travel using timestamps when the snapshots were created or directly using the snapshot_id. The following is an example of a CALL statement that uses rollback_to_snapshot:

%%sql
CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', 8008410363488501197)

We then travel forward in time by calling set_current_snapshot:

%%sql
CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', 8392090950225782953)

Partition evolution

The notebook ends with an example that shows how partition evolution works in Iceberg. Iceberg stores partition information as part of the metadata. Because there is no separate partition column in the data itself, changing the partitioning scheme to hourly partitions for example is just a matter of calling a different partition transform hours(…) on an existing column order_datetime as shown in the following example:

%%sql
ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)

You can continue to use the old partition on the old data. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately.

The notebook shows how you can query the table using the new hourly partition:

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=1

You can continue to query your old data using the day() transform. There is only the original order_datetime column in the table.

%%sql
SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=14

You don’t have to store additional columns to accommodate multiple partitioning schemes. The partition definitions are in the metadata, providing the flexibility to evolve and change the partition definitions in the future.

Conclusion

In this post, we introduced Apache Iceberg and explained how Iceberg solves some challenges in modern data lakes. We then walked you through how to run Iceberg on Amazon EMR using the AWS Glue Data Catalog as the metastore, and query the data using Athena. You can also run upserts on this data from Athena. There is no additional cost to using Iceberg with Amazon EMR.

For more information about Iceberg, refer to How Iceberg works. Iceberg on Amazon EMR, with its integration with AWS Analytics services, can simplify the way you process, upsert, and delete data, with full support for ACID transactions in Amazon S3. You can also implement schema evolution, partition evolution, time travel, and compaction of data.


About the Author

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects, especially those focused on underprivileged Children’s education.