All posts by Yonatan Dolan

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

How Aura from Unity revolutionized their big data pipeline with Amazon Redshift Serverless

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-aura-from-unity-revolutionized-their-big-data-pipeline-with-amazon-redshift-serverless/

This post is co-written with  Amir Souchami and  Fabian Szenkier from Unity.

Aura from Unity (formerly known as ironSource) is the market standard for creating rich device experiences that engage and retain customers. With a powerful set of solutions, Aura enables complete digital transformation, letting operators promote key services outside the store, directly on-device.

Amazon Redshift is a recommended service for online analytical processing (OLAP) workloads such as cloud data warehouses, data marts, and other analytical data stores. You can use simple SQL to analyze structured and semi-structured data, operational databases, and data lakes to deliver the best price/performance at any scale. The Amazon Redshift data sharing feature provides instant, granular, and high-performance access without data copies and data movement across multiple Redshift data warehouses in the same or different AWS accounts and across AWS Regions. Data sharing provides live access to data so that you always see the most up-to-date and consistent information as it’s updated in the data warehouse.

Amazon Redshift Serverless makes it straightforward to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. Redshift Serverless automatically provisions and intelligently scales data warehouse capacity to deliver fast performance for even the most demanding and unpredictable workloads, and you pay only for what you use. You can load your data and start querying right away in the Amazon Redshift Query Editor or in your favorite business intelligence (BI) tool and continue to enjoy the best price/performance and familiar SQL features in an easy-to-use, zero administration environment.

In this post, we describe Aura’s successful and swift adoption of Redshift Serverless, which allowed them to optimize their overall bidding advertisement campaigns’ time to market from 24 hours to 2 hours. We explore why Aura chose this solution and what technological challenges it helped solve.

Aura’s initial data pipeline

Aura is a pioneer in using Redshift RA3 clusters with data sharing for extract, transform, and load (ETL) and BI workloads. One of Aura’s operations is bidding advertisement campaigns. These campaigns are optimized by using an AI-based bid process that requires running hundreds of analytical queries per campaign. These queries are run on data that resides in an RA3 provisioned Redshift cluster.

The integrated pipeline is comprised of various AWS services:

The following diagram illustrates this architecture.

Aura architecture

Challenges of the initial architecture

The queries for each campaign run in the following manner:

First, a preparation query filters and aggregates raw data, preparing it for the subsequent operation. This is followed by the main query, which carries out the logic according to the preparation query result set.

As the number of campaigns grew, Aura’s Data team was required to run hundreds of concurrent queries for each of these steps. Aura’s existing provisioned cluster was already heavily utilized with data ingestion, ETL, and BI workloads, so they were looking for cost-effective ways to isolate this workload with dedicated compute resources.

The team evaluated a variety of options, including unloading data to Amazon S3 and a multi-cluster architecture using data sharing and Redshift serverless. The team gravitated towards the multi-cluster architecture with data sharing, as it requires no query rewrite, allows for dedicated compute for this specific workload, avoids the need to duplicate or move data from the main cluster, and provides high concurrency and automatic scaling. Lastly, it’s billed in a pay-for-what-you-use model, and provisioning is straightforward and quick.

Proof of concept

After evaluating the options, Aura’s Data team decided to conduct a proof of concept using Redshift Serverless as a consumer of their main Redshift provisioned cluster, sharing just the relevant tables for running the required queries. Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). A single RPU provides 16 GB of memory and a serverless endpoint can range from 8 RPU to 512 RPU.

Aura’s Data team started the proof of concept using a 256 RPU Redshift Serverless endpoint and gradually lowered the RPU to reduce costs while making sure the query runtime was below the required target.

Eventually, the team decided to use a 128 RPU (2 TB RAM) Redshift Serverless endpoint as the base RPU, while using the Redshift Serverless auto scaling feature, which allows hundreds of concurrent queries to run by automatically upscaling the RPU as needed.

Aura’s new solution with Redshift Serverless

After a successful proof of concept, the production setup included adding code to switch between the provisioned Redshift cluster and the Redshift Serverless endpoint. This was done using a configurable threshold based on the number of queries waiting to be processed in a specific MSK topic consumed at the beginning of the pipeline. Small-scale campaign queries would still run on the provisioned cluster, and large-scale queries would use the Redshift Serverless endpoint. The new solution uses an Amazon MWAA pipeline that fetches configuration information from a DynamoDB table, consumes jobs that represent ad campaigns, and then runs hundreds of EKS jobs triggered using EKSPodOperator. Each job runs the two serial queries (the preparation query followed by a main query, which outputs the results to Amazon S3). This happens several hundred times concurrently using Redshift Serverless compute resources.

Then the process initiates another set of EKSPodOperator operators to run the AI training code based on the data result that was saved on Amazon S3.

The following diagram illustrates the solution architecture.

Aura new architecture

Outcome

The overall runtime of the pipeline was reduced from 24 hours to just 2 hours, a 12-times improvement. This integration of Redshift Serverless, coupled with data sharing, led to a 90% reduction in pipeline duration, negating the necessity for data duplication or query rewriting. Moreover, the introduction of a dedicated consumer as an exclusive compute resource significantly eased the load of the producer cluster, enabling running small-scale queries even faster.

“Redshift Serverless and data sharing enabled us to provision and scale our data warehouse capacity to deliver fast performance, high concurrency and handle challenging ML workloads with very minimal effort.”

– Amir Souchami, Aura’s Principal Technical Systems Architect.

Learnings

Aura’s Data team is highly focused on working in a cost-effective manner and has therefore implemented several cost controls in their Redshift Serverless endpoint:

  • Limit the overall spend by setting a maximum RPU-hour usage limit (per day, week, month) for the workgroup. Aura configured that limit so when it is reached, Amazon Redshift will send an alert to the relevant Amazon Redshift administrator team. This feature also allows writing an entry to a system table and even turning off user queries.
  • Use a maximum RPU configuration, which defines the upper limit of compute resources that Redshift Serverless can use at any given time. When the maximum RPU limit is set for the workgroup, Redshift Serverless scales within that limit to continue to run the workload.
  • Implement query monitoring rules that prevent wasteful resource utilization and runaway costs caused by poorly written queries.

Conclusion

A data warehouse is a crucial part of any modern data-driven company, enabling you to answer complex business questions and provide insights. The evolution of Amazon Redshift allowed Aura to quickly adapt to business requirements by combining data sharing between provisioned and Redshift Serverless data warehouses. Aura’s journey with Redshift Serverless underscores the vast potential of strategic tech integration in driving efficiency and operational excellence.

If Aura’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how such a solution can address them.
  • Reach out to AWS experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. Such hands-on experience will provide valuable insights before moving to production.

Elevate your Redshift expertise. Already enjoying the power of Amazon Redshift? Enhance your data journey with the latest features and expert guidance. Reach out to your dedicated AWS account team for personalized support, discover cutting-edge capabilities, and unlock even greater value from your data with Amazon Redshift.


About the Authors

Amir Souchami, Chief Architect of Aura from Unity, focusing on creating resilient and performant cloud systems and mobile apps at major scale.

Fabian Szenkier is the ML and Big Data Architect at Aura by Unity, works on building modern AI/ML solutions and state of the art data engineering pipelines at scale.

Liat Tzur is a Senior Technical Account Manager at Amazon Web Services. She serves as the customer’s advocate and assists her customers in achieving cloud operational excellence in alignment with their business goals.

Adi Jabkowski is a Sr. Redshift Specialist in EMEA, part of the Worldwide Specialist Organization (WWSO) at AWS.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

Orca Security’s journey to a petabyte-scale data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/orca-securitys-journey-to-a-petabyte-scale-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Eliad Gat and Oded Lifshiz from Orca Security.

With data becoming the driving force behind many industries today, having a modern data architecture is pivotal for organizations to be successful. One key component that plays a central role in modern data architectures is the data lake, which allows organizations to store and analyze large amounts of data in a cost-effective manner and run advanced analytics and machine learning (ML) at scale.

Orca Security is an industry-leading Cloud Security Platform that identifies, prioritizes, and remediates security risks and compliance issues across your AWS Cloud estate. Orca connects to your environment in minutes with patented SideScanning technology to provide complete coverage across vulnerabilities, malware, misconfigurations, lateral movement risk, weak and leaked passwords, overly permissive identities, and more.

The Orca Platform is powered by a state-of-the-art anomaly detection system that uses cutting-edge ML algorithms and big data capabilities to detect potential security threats and alert customers in real time, ensuring maximum security for their cloud environment. At the core of Orca’s anomaly detection system is its transactional data lake, which enables the company’s data scientists, analysts, data engineers, and ML specialists to extract valuable insights from vast amounts of data and deliver innovative cloud security solutions to its customers.

In this post, we describe Orca’s journey building a transactional data lake using Amazon Simple Storage Service (Amazon S3), Apache Iceberg, and AWS Analytics. We explore why Orca chose to build a transactional data lake and examine the key considerations that guided the selection of Apache Iceberg as the preferred table format.

In addition, we describe the Orca Platform architecture and the technologies used. Lastly, we discuss the challenges encountered throughout the project, present the solutions used to address them, and share valuable lessons learned.

Why did Orca build a data lake?

Prior to the creation of the data lake, Orca’s data was distributed among various data silos, each owned by a different team with its own data pipelines and technology stack. This setup led to several issues, including scaling difficulties as the data size grew, maintaining data quality, ensuring consistent and reliable data access, high costs associated with storage and processing, and difficulties supporting streaming use cases. Moreover, running advanced analytics and ML on disparate data sources proved challenging. To overcome these issues, Orca decided to build a data lake.

A data lake is a centralized data repository that enables organizations to store and manage large volumes of structured and unstructured data, eliminating data silos and facilitating advanced analytics and ML on the entire data. By decoupling storage and compute, data lakes promote cost-effective storage and processing of big data.

Why did Orca choose Apache Iceberg?

Orca considered several table formats that have evolved in recent years to support its transactional data lake. Amongst the options, Apache Iceberg stood out as the ideal choice because it met all of Orca’s requirements.

First, Orca sought a transactional table format that ensures data consistency and fault tolerance. Apache Iceberg’s transactional and ACID guarantees, which allow concurrent read and write operations while ensuring data consistency and simplified fault handling, fulfill this requirement. Furthermore, Apache Iceberg’s support for time travel and rollback capabilities makes it highly suitable for addressing data quality issues by reverting to a previous state in a consistent manner.

Second, a key requirement was to adopt an open table format that integrates with various processing engines. This was to avoid vendor lock-in and allow teams to choose the processing engine that best suits their needs. Apache Iceberg’s engine-agnostic and open design meets this requirement by supporting all popular processing engines, including Apache Spark, Amazon Athena, Apache Flink, Trino, Presto, and more.

In addition, given the substantial data volumes handled by the system, an efficient table format was required that can support querying petabytes of data very fast. Apache Iceberg’s architecture addresses this need by efficiently filtering and reducing scanned data, resulting in accelerated query times.

An additional requirement was to allow seamless schema changes without impacting end-users. Apache Iceberg’s range of features, including schema evolution, hidden partitions, and partition evolution, addresses this requirement.

Lastly, it was important for Orca to choose a table format that is widely adopted. Apache Iceberg’s growing and active community aligned with the requirement for a popular and community-backed table format.

Solution overview

Orca’s data lake is based on open-source technologies that seamlessly integrate with Apache Iceberg. The system ingests data from various sources such as cloud resources, cloud activity logs, and API access logs, and processes billions of messages, resulting in terabytes of data daily. This data is sent to Apache Kafka, which is hosted on Amazon Managed Streaming for Apache Kafka (Amazon MSK). It is then processed using Apache Spark Structured Streaming running on Amazon EMR and stored in the data lake. Amazon EMR streamlines the process of loading all required Iceberg packages and dependencies, ensuring that the data is stored in Apache Iceberg format and ready for consumption as quickly as possible.

The data lake is built on top of Amazon S3 using Apache Iceberg table format with Apache Parquet as the underlying file format. In addition, the AWS Glue Data Catalog enables data discovery, and AWS Identity and Access Management (IAM) enforces secure access controls for the lake and its operations.

The data lake serves as the foundation for a variety of capabilities that are supported by different engines.

Data pipelines built on Apache Spark and Athena SQL analyze and process the data stored in the data lake. These data pipelines generate valuable insights and curated data that are stored in Apache Iceberg tables for downstream usage. This data is then used by various applications for streaming analytics, business intelligence, and reporting.

Amazon SageMaker is used to build, train, and deploy a range of ML models. Specifically, the system uses Amazon SageMaker Processing jobs to process the data stored in the data lake, employing the AWS SDK for Pandas (previously known as AWS Wrangler) for various data transformation operations, including cleaning, normalization, and feature engineering. This ensures that the data is suitable for training purposes. Additionally, SageMaker training jobs are employed for training the models. After the models are trained, they are deployed and used to identify anomalies and alert customers in real time to potential security threats. The following diagram illustrates the solution architecture.

Orca security Data Lake Architecture

Challenges and lessons learned

Orca faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing EMR streaming ingestion for high throughput
  • Taming the small files problem for fast reads
  • Maximizing performance with Athena version 3
  • Maintaining Apache Iceberg tables
  • Managing data retention
  • Monitoring the data lake infrastructure and operations
  • Mitigating data quality issues

In this section, we describe each of these challenges and the solutions implemented to address them.

Determining optimal table partitioning

Determining optimal partitioning for each table is very important in order to optimize query performance and minimize the impact on teams querying the tables when partitioning changes. Apache Iceberg’s hidden partitions combined with partition transformations proved to be valuable in achieving this goal because it allowed for transparent changes to partitioning without impacting end-users. Additionally, partition evolution enables experimentation with various partitioning strategies to optimize cost and performance without requiring a rewrite of the table’s data every time.

For example, with these features, Orca was able to easily change several of its table partitioning from DAY to HOUR with no impact on user queries. Without this native Iceberg capability, they would have needed to coordinate the new schema with all the teams that query the tables and rewrite the entire data, which would have been a costly, time-consuming, and error-prone process.

Optimizing EMR streaming ingestion for high throughput

As mentioned previously, the system ingests billions of messages daily, resulting in terabytes of data processed and stored each day. Therefore, optimizing the EMR clusters for this type of load while maintaining high throughput and low costs has been an ongoing challenge. Orca addressed this in several ways.

First, Orca chose to use instance fleets with its EMR clusters because they allow optimized resource allocation by combining different instance types and sizes. Instance fleets improve resilience by allowing multiple Availability Zones to be configured. As a result, the cluster will launch in an Availability Zone with all the required instance types, preventing capacity limitations. Additionally, instance fleets can use both Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot instances, resulting in cost savings.

The process of sizing the cluster for high throughput and lower costs involved adjusting the number of core and task nodes, selecting suitable instance types, and fine-tuning CPU and memory configurations. Ultimately, Orca was able to find an optimal configuration consisting of on-demand core nodes and spot task nodes of varying sizes, which provided high throughput but also ensured compliance with SLAs.

Orca also found that using different Kafka Spark Structured Streaming properties, such as minOffsetsPerTrigger, maxOffsetsPerTrigger, and minPartitions, provided higher throughput and better control of the load. Using minPartitions, which enables better parallelism and distribution across a larger number of tasks, was particularly useful for consuming high lags quickly.

Lastly, when dealing with a high data ingestion rate, Amazon S3 may throttle the requests and return 503 errors. To address this scenario, Iceberg offers a table property called write.object-storage.enabled, which incorporates a hash prefix into the stored S3 object path. This approach effectively mitigates throttling problems.

Taming the small files problem for fast reads

A common challenge often encountered when ingesting streaming data into the data lake is the creation of many small files. This can have a negative impact on read performance when querying the data with Athena or Apache Spark. Having a high number of files leads to longer query planning and runtimes due to the need to process and read each file, resulting in overhead for file system operations and network communication. Additionally, this can result in higher costs due to the large number of S3 PUT and GET requests required.

To address this challenge, Apache Spark Structured Streaming provides the trigger mechanism, which can be used to tune the rate at which data is committed to Apache Iceberg tables. The commit rate has a direct impact on the number of files being produced. For instance, a higher commit rate, corresponding to a shorter time interval, results in lots of data files being produced.

In certain cases, launching the Spark cluster on an hourly basis and configuring the trigger to AvailableNow facilitated the processing of larger data batches and reduced the number of small files created. Although this approach led to cost savings, it did involve a trade-off of reduced data freshness. However, this trade-off was deemed acceptable for specific use cases.

In addition, to address preexisting small files within the data lake, Apache Iceberg offers a data files compaction operation that combines these smaller files into larger ones. Running this operation on a schedule is highly recommended to optimize the number and size of the files. Compaction also proves valuable in handling late-arriving data and enables the integration of this data into consolidated files.

Maximizing performance with Athena version 3

Orca was an early adopter of Athena version 3, Amazon’s implementation of the Trino query engine, which provides extensive support for Apache Iceberg. Whenever possible, Orca preferred using Athena over Apache Spark for data processing. This preference was driven by the simplicity and serverless architecture of Athena, which led to reduced costs and easier usage, unlike Spark, which typically required provisioning and managing a dedicated cluster at higher costs.

In addition, Orca used Athena as part of its model training and as the primary engine for ad hoc exploratory queries conducted by data scientists, business analysts, and engineers. However, for maintaining Iceberg tables and updating table properties, Apache Spark remained the more scalable and feature-rich option.

Maintaining Apache Iceberg tables

Ensuring optimal query performance and minimizing storage overhead became a significant challenge as the data lake grew to a petabyte scale. To address this challenge, Apache Iceberg offers several maintenance procedures, such as the following:

  • Data files compaction – This operation, as mentioned earlier, involves combining smaller files into larger ones and reorganizing the data within them. This operation not only reduces the number of files but also enables data sorting based on different columns or clustering similar data using z-ordering. Using Apache Iceberg’s compaction results in significant performance improvements, especially for large tables, making a noticeable difference in query performance between compacted and uncompacted data.
  • Expiring old snapshots – This operation provides a way to remove outdated snapshots and their associated data files, enabling Orca to maintain low storage costs.

Running these maintenance procedures efficiently and cost-effectively using Apache Spark, particularly the compaction operation, which operates on terabytes of data daily, requires careful consideration. This entails appropriately sizing the Spark cluster running on EMR and adjusting various settings such as CPU and memory.

In addition, using Apache Iceberg’s metadata tables proved to be very helpful in identifying issues related to the physical layout of Iceberg’s tables, which can directly impact query performance. Metadata tables offer insights into the physical data storage layout of the tables and offer the convenience of querying them with Athena version 3. By accessing the metadata tables, crucial information about tables’ data files, manifests, history, partitions, snapshots, and more can be obtained, which aids in understanding and optimizing the table’s data layout.

For instance, the following queries can uncover valuable information about the underlying data:

  • The number of files and their average size per partition:
    >SELECT partition, file_count, (total_size / file_count) AS avg_file_size FROM "db"."table$partitions"

  • The number of data files pointed to by each manifest:
    SELECT path, added_data_files_count + existing_data_files_count AS number_of_data_files FROM "db"."table$manifests"

  • Information about the data files:
    SELECT file_path, file_size_in_bytes FROM "db"."table$files"

  • Information related to data completeness:
    SELECT record_count, partition FROM "db"."table$partitions"

Managing data retention

Effective management of data retention in a petabyte-scale data lake is crucial to ensure low storage costs as well as to comply with GDPR. However, implementing such a process can be challenging when dealing with Iceberg data stored in S3 buckets, because deleting files based on simple S3 lifecycle policies could potentially cause table corruption. This is because Iceberg’s data files are referenced in manifest files, so any changes to data files must also be reflected in the manifests.

To address this challenge, certain considerations must be taken into account while handling data retention properly. Apache Iceberg provides two modes for handling deletes, namely copy-on-write (CoW), and merge-on-read (MoR). In CoW mode, Iceberg rewrites data files at the time of deletion and creates new data files, whereas in MoR mode, instead of rewriting the data files, a delete file is written that lists the position of deleted records in files. These files are then reconciled with the remaining data during read time.

In favor of faster read times, CoW mode is preferable and when used in conjunction with the expiring old snapshots operation, it allows for the hard deletion of data files that have exceeded the set retention period.

In addition, by storing the data sorted based on the field that will be utilized for deletion (for example, organizationID), it’s possible to reduce the number of files that require rewriting. This optimization significantly enhances the efficiency of the deletion process, resulting in improved deletion times.

Monitoring the data lake infrastructure and operations

Managing a data lake infrastructure is challenging due to the various components it encompasses, including those responsible for data ingestion, storage, processing, and querying.

Effective monitoring of all these components involves tracking resource utilization, data ingestion rates, query runtimes, and various other performance-related metrics, and is essential for maintaining optimal performance and detecting issues as soon as possible.

Monitoring Amazon EMR was crucial because it played a vital role in the system for data ingestion, processing, and maintenance. Orca monitored the cluster status and resource usage of Amazon EMR by utilizing the available metrics through Amazon CloudWatch. Furthermore, it used JMX Exporter and Prometheus to scrape specific Apache Spark metrics and create custom metrics to further improve the pipelines’ observability.

Another challenge emerged when attempting to further monitor the ingestion progress through Kafka lag. Although Kafka lag tracking is the standard method for monitoring ingestion progress, it posed a challenge because Spark Structured Streaming manages its offsets internally and doesn’t commit them back to Kafka. To overcome this, Orca utilized the progress of the Spark Structured Streaming Query Listener (StreamingQueryListener) to monitor the processed offsets, which were then committed to a dedicated Kafka consumer group for lag monitoring.

In addition, to ensure optimal query performance and identify potential performance issues, it was essential to monitor Athena queries. Orca addressed this by using key metrics from Athena and the AWS SDK for Pandas, specifically TotalExecutionTime and ProcessedBytes. These metrics helped identify any degradation in query performance and keep track of costs, which were based on the size of the data scanned.

Mitigating data quality issues

Apache Iceberg’s capabilities and overall architecture played a key role in mitigating data quality challenges.

One of the ways Apache Iceberg addresses these challenges is through its schema evolution capability, which enables users to modify or add columns to a table’s schema without rewriting the entire data. This feature prevents data quality issues that may arise due to schema changes, because the table’s schema is managed as part of the manifest files, ensuring safe changes.

Furthermore, Apache Iceberg’s time travel feature provides the ability to review a table’s history and roll back to a previous snapshot. This functionality has proven to be extremely useful in identifying potential data quality issues and swiftly resolving them by reverting to a previous state with known data integrity.

These robust capabilities ensure that data within the data lake remains accurate, consistent, and reliable.

Conclusion

Data lakes are an essential part of a modern data architecture, and now it’s easier than ever to create a robust, transactional, cost-effective, and high-performant data lake by using Apache Iceberg, Amazon S3, and AWS Analytics services such as Amazon EMR and Athena.

Since building the data lake, Orca has observed significant improvements. The data lake infrastructure has allowed Orca’s platform to have seamless scalability while reducing the cost of running its data pipelines by over 50% utilizing Amazon EMR. Additionally, query costs were reduced by more than 50% using the efficient querying capabilities of Apache Iceberg and Athena version 3.

Most importantly, the data lake has made a profound impact on Orca’s platform and continues to play a key role in its success, supporting new use cases such as change data capture (CDC) and others, and enabling the development of cutting-edge cloud security solutions.

If Orca’s journey has sparked your interest and you are considering implementing a similar solution in your organization, here are some strategic steps to consider:

  • Start by thoroughly understanding your organization’s data needs and how this solution can address them.
  • Reach out to experts, who can provide you with guidance based on their own experiences. Consider engaging in seminars, workshops, or online forums that discuss these technologies. The following resources are recommended for getting started:
  • An important part of this journey would be to implement a proof of concept. This hands-on experience will provide valuable insights into the complexities of a transactional data lake.

Embarking on a journey to a transactional data lake using Amazon S3, Apache Iceberg, and AWS Analytics can vastly improve your organization’s data infrastructure, enabling advanced analytics and machine learning, and unlocking insights that drive innovation.


About the Authors

Eliad Gat is a Big Data & AI/ML Architect at Orca Security. He has over 15 years of experience designing and building large-scale cloud-native distributed systems, specializing in big data, analytics, AI, and machine learning.

Oded Lifshiz is a Principal Software Engineer at Orca Security. He enjoys combining his passion for delivering innovative, data-driven solutions with his expertise in designing and building large-scale machine learning pipelines.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan also leads the Apache Iceberg Israel community.

Carlos Rodrigues is a Big Data Specialist Solutions Architect at Amazon Web Services. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Hudi and Apache Iceberg.

Sofia Zilberman is a Sr. Analytics Specialist Solutions Architect at Amazon Web Services. She has a track record of 15 years of creating large-scale, distributed processing systems. She remains passionate about big data technologies and architecture trends, and is constantly on the lookout for functional and technological innovations.