Tag Archives: Amazon Athena

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Simplify custom contact center insights with Amazon Connect analytics data lake

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/simplify-custom-contact-center-insights-with-amazon-connect-analytics-data-lake/

Analytics are vital to the success of a contact center. Having insights into each touchpoint of the customer experience allows you to accurately measure performance and adapt to shifting business demands. While you can find common metrics in the Amazon Connect console, sometimes you need to have more details and custom requirements for reporting based on the unique needs of your business. 

Starting today, the Amazon Connect analytics data lake is generally available. As announced last year as preview, this new capability helps you to eliminate the need to build and maintain complex data pipelines. Amazon Connect data lake is zero-ETL capable, so no extract, transform, or load (ETL) is needed.

Here’s a quick look at the Amazon Connect analytics data lake:

Improving your customer experience with Amazon Connect
Amazon Connect analytics data lake helps you to unify disparate data sources, including customer contact records and agent activity, into a single location. By having your data in a centralized location, you now have access to analyze contact center performance and gain insights while reducing the costs associated with implementing complex data pipelines.

With Amazon Connect analytics data lake, you can access and analyze contact center data, such as contact trace records and Amazon Connect Contact Lens data. This provides you the flexibility to prepare and analyze data with Amazon Athena and use the business intelligence (BI) tools of your choice, such as, Amazon QuickSight and Tableau

Get started with the Amazon Connect analytics data lake
To get started with the Amazon Connect analytics data lake, you’ll first need to have an Amazon Connect instance setup. You can follow the steps in the Create an Amazon Connect instance page to create a new Amazon Connect instance. Because I’ve already created my Amazon Connect instance, I will go straight to showing you how you can get started with Amazon Connect analytics data lake.

First, I navigate to the Amazon Connect console and select my instance.

Then, on the next page, I can set up my analytics data lake by navigating to Analytics tools and selecting Add data share.

This brings up a pop-up dialog, and I first need to define the target AWS account ID. With this option, I can set up a centralized account to receive all data from Amazon Connect instances running in multiple accounts. Then, under Data types, I can select the types I need to share with the target AWS account. To learn more about the data types that you can share in the Amazon Connect analytics data lake, please visit Associate tables for Analytics data lake.

Once it’s done, I can see the list of all the target AWS account IDs with which I have shared all the data types.

Besides using the AWS Management Console, I can also use the AWS Command Line Interface (AWS CLI) to associate my tables with the analytics data lake. The following is a sample command:

$> aws connect batch-associate-analytics-data-set --cli-input-json file:///input_batch_association.json

Where input_batch_association.json is a JSON file that contains association details. Here’s a sample:

{
	"InstanceId": YOUR_INSTANCE_ID,
	"DataSetIds": [
		"<DATA_SET_ID>"
		],
	"TargetAccountId": YOUR_ACCOUNT_ID
} 

Next, I need to approve (or reject) the request in the AWS Resource Access Manager (RAM) console in the target account. RAM is a service to help you securely share resources across AWS accounts. I navigate to AWS RAM and select Resource shares in the Shared with me section.

Then, I select the resource and select Accept resource share

At this stage, I can access shared resources from Amazon Connect. Now, I can start creating linked tables from shared tables in AWS Lake Formation. In the Lake Formation console, I navigate to the Tables page and select Create table.

I need to create a Resource link to a shared table. Then, I fill in the details and select the available Database and the Shared table’s region.

Then, when I select Shared table, it will list all the available shared tables that I can access.

Once I select the shared table, it will automatically populate Shared table’s database and Shared table’s owner ID. Once I’m happy with the configuration, I select Create.

To run some queries for the data, I go to the Amazon Athena console.The following is an example of a query that I ran:

With this configuration, I have access to certain Amazon Connect data types. I can even visualize the data by integrating with Amazon QuickSight. The following screenshot show some visuals in the Amazon QuickSight dashboard with data from Amazon Connect.

Customer voice
During the preview period, we heard lots of feedback from our customers about Amazon Connect analytics data lake. Here’s what our customer say:

Joulica is an analytics platform supporting insights for software like Amazon Connect and Salesforce. Tony McCormack, founder and CEO of Joulica, said, “Our core business is providing real-time and historical contact center analytics to Amazon Connect customers of all sizes. In the past, we frequently had to set up complex data pipelines, and so we are excited about using Amazon Connect analytics data lake to simplify the process of delivering actionable intelligence to our shared customers.”

Things you need to know

  • Pricing — Amazon Connect analytics data lake is available for you to use up to 2 years of data without any additional charges in Amazon Connect. You only need to pay for any services you use to interact with the data.
  • Availability — Amazon Connect analytics data lake is generally available in the following AWS Regions: US East (N. Virginia), US West (Oregon), Africa (Cape Town), Asia Pacific (Mumbai, Seoul, Singapore, Sydney, Tokyo), Canada (Central), and Europe (Frankfurt, London)
  • Learn more — For more information, please visit Analytics data lake documentation page.

Happy building,
Donnie

Simplify data lake access control for your enterprise users with trusted identity propagation in AWS IAM Identity Center, AWS Lake Formation, and Amazon S3 Access Grants

Post Syndicated from Shoukat Ghouse original https://aws.amazon.com/blogs/big-data/simplify-data-lake-access-control-for-your-enterprise-users-with-trusted-identity-propagation-in-aws-iam-identity-center-aws-lake-formation-and-amazon-s3-access-grants/

Many organizations use external identity providers (IdPs) such as Okta or Microsoft Azure Active Directory to manage their enterprise user identities. These users interact with and run analytical queries across AWS analytics services. To enable them to use the AWS services, their identities from the external IdP are mapped to AWS Identity and Access Management (IAM) roles within AWS, and access policies are applied to these IAM roles by data administrators.

Given the diverse range of services involved, different IAM roles may be required for accessing the data. Consequently, administrators need to manage permissions across multiple roles, a task that can become cumbersome at scale.

To address this challenge, you need a unified solution to simplify data access management using your corporate user identities instead of relying solely on IAM roles. AWS IAM Identity Center offers a solution through its trusted identity propagation feature, which is built upon the OAuth 2.0 authorization framework.

With trusted identity propagation, data access management is anchored to a user’s identity, which can be synchronized to IAM Identity Center from external IdPs using the System for Cross-domain Identity Management (SCIM) protocol. Integrated applications exchange OAuth tokens, and these tokens are propagated across services. This approach empowers administrators to grant access directly based on existing user and group memberships federated from external IdPs, rather than relying on IAM users or roles.

In this post, we showcase the seamless integration of AWS analytics services with trusted identity propagation by presenting an end-to-end architecture for data access flows.

Solution overview

Let’s consider a fictional company, OkTank. OkTank has multiple user personas that use a variety of AWS Analytics services. The user identities are managed externally in an external IdP: Okta. User1 is a Data Analyst and uses the Amazon Athena query editor to query AWS Glue Data Catalog tables with data stored in Amazon Simple Storage Service (Amazon S3). User2 is a Data Engineer and uses Amazon EMR Studio notebooks to query Data Catalog tables and also query raw data stored in Amazon S3 that is not yet cataloged to the Data Catalog. User3 is a Business Analyst who needs to query data stored in Amazon Redshift tables using the Amazon Redshift Query Editor v2. Additionally, this user builds Amazon QuickSight visualizations for the data in Redshift tables.

OkTank wants to simplify governance by centralizing data access control for their variety of data sources, user identities, and tools. They also want to define permissions directly on their corporate user or group identities from Okta instead of creating IAM roles for each user and group and managing access on the IAM role. In addition, for their audit requirements, they need the capability to map data access to the corporate identity of users within Okta for enhanced tracking and accountability.

To achieve these goals, we use trusted identity propagation with the aforementioned services and use AWS Lake Formation and Amazon S3 Access Grants for access controls. We use Lake Formation to centrally manage permissions to the Data Catalog tables and Redshift tables shared with Redshift datashares. In our scenario, we use S3 Access Grants for granting permission for the Athena query result location. Additionally, we show how to access a raw data bucket governed by S3 Access Grants with an EMR notebook.

Data access is audited with AWS CloudTrail and can be queried with AWS CloudTrail Lake. This architecture showcases the versatility and effectiveness of AWS analytics services in enabling efficient and secure data analysis workflows across different use cases and user personas.

We use Okta as the external IdP, but you can also use other IdPs like Microsoft Azure Active Directory. Users and groups from Okta are synced to IAM Identity Center. In this post, we have three groups, as shown in the following diagram.

User1 needs to query a Data Catalog table with data stored in Amazon S3. The S3 location is secured and managed by Lake Formation. The user connects to an IAM Identity Center enabled Athena workgroup using the Athena query editor with EMR Studio. The IAM Identity Center enabled Athena workgroups need to be secured with S3 Access Grants permissions for the Athena query results location. With this feature, you can also enable the creation of identity-based query result locations that are governed by S3 Access Grants. These user identity-based S3 prefixes let users in an Athena workgroup keep their query results isolated from other users in the same workgroup. The following diagram illustrates this architecture.

User2 needs to query the same Data Catalog table as User1. This table is governed using Lake Formation permissions. Additionally, the user needs to access raw data in another S3 bucket that isn’t cataloged to the Data Catalog and is controlled using S3 Access Grants; in the following diagram, this is shown as S3 Data Location-2.

The user uses an EMR Studio notebook to run Spark queries on an EMR cluster. The EMR cluster uses a security configuration that integrates with IAM Identity Center for authentication and uses Lake Formation for authorization. The EMR cluster is also enabled for S3 Access Grants. With this kind of hybrid access management, you can use Lake Formation to centrally manage permissions for your datasets cataloged to the Data Catalog and use S3 Access Grants to centrally manage access to your raw data that is not yet cataloged to the Data Catalog. This gives you flexibility to access data managed by either of the access control mechanisms from the same notebook.

User3 uses the Redshift Query Editor V2 to query a Redshift table. The user also accesses the same table with QuickSight. For our demo, we use a single user persona for simplicity, but in reality, these could be completely different user personas. To enable access control with Lake Formation for Redshift tables, we use data sharing in Lake Formation.

Data access requests by the specific users are logged to CloudTrail. Later in this post, we also briefly touch upon using CloudTrail Lake to query the data access events.

In the following sections, we demonstrate how to build this architecture. We use AWS CloudFormation to provision the resources. AWS CloudFormation lets you model, provision, and manage AWS and third-party resources by treating infrastructure as code. We also use the AWS Command Line Interface (AWS CLI) and AWS Management Console to complete some steps.

The following diagram shows the end-to-end architecture.

Prerequisites

Complete the following prerequisite steps:

  1. Have an AWS account. If you don’t have an account, you can create one.
  2. Have IAM Identity Center set up in a specific AWS Region.
  3. Make sure you use the same Region where you have IAM Identity Center set up throughout the setup and verification steps. In this post, we use the us-east-1 Region.
  4. Have Okta set up with three different groups and users, and enable sync to IAM Identity Center. Refer to Configure SAML and SCIM with Okta and IAM Identity Center for instructions.

After the Okta groups are pushed to IAM Identity Center, you can see the users and groups on the IAM Identity Center console, as shown in the following screenshot. You need the group IDs of the three groups to be passed in the CloudFormation template.

  1. For enabling User2 access using the EMR cluster, you need have an SSL certificate .zip file available in your S3 bucket. You can download the following sample certificate to use in this post. In production use cases, you should create and use your own certificates. You need to reference the bucket name and the certificate bundle .zip file in AWS CloudFormation. The CloudFormation template lets you choose the components you want to provision. If you do not intend to deploy the EMR cluster, you can ignore this step.
  2. Have an administrator user or role to run the CloudFormation stack. The user or role should also be a Lake Formation administrator to grant permissions.

Deploy the CloudFormation stack

The CloudFormation template provided in the post lets you choose the components you want to provision from the solution architecture. In this post, we enable all components, as shown in the following screenshot.

Run the provided CloudFormation stack to create the solution resources. Refer to the following table for a list of important parameters.

Parameter Group Description Parameter Name Expected Value
Choose components to provision. Choose the components you want to be provisioned. DeployAthenaFlow Yes/No. If you choose No, you can ignore the parameters in the “Athena Configuration” group.
DeployEMRFlow Yes/No. If you choose No, you can ignore the parameters in the “EMR Configuration” group.
DeployRedshiftQEV2Flow Yes/No. If you choose No, you can ignore the parameters in the “Redshift Configuration” group.
CreateS3AGInstance Yes/No. If you already have an S3 Access Grants instance, choose No. Otherwise, choose Yes to allow the stack create a new S3 Access Grants instance. The S3 Access Grants instance is needed for User1 and User2.
Identity Center Configuration IAM Identity Center parameters. IDCGroup1Id Group ID corresponding to Group1 from IAM Identity Center.
IDCGroup2Id Group ID corresponding to Group2 from IAM Identity Center.
IDCGroup3Id Group ID corresponding to Group3 from IAM Identity Center.
IAMIDCInstanceArn IAM Identity Center instance ARN. You can get this from the Settings section of IAM Identity Center.
Redshift Configuration

Redshift parameters.

Ignore if you chose DeployRedshiftQEV2Flow as No.

RedshiftServerlessAdminUserName Redshift admin user name.
RedshiftServerlessAdminPassword Redshift admin password.
RedshiftServerlessDatabase Redshift database to create the tables.
EMR Configuration

EMR parameters.

Ignore if you chose parameter DeployEMRFlow as No.

SSlCertsS3BucketName Bucket name where you copied the SSL certificates.
SSlCertsZip Name of SSL certificates file (my-certs.zip) to use the sample certificate provided in the post.
Athena Configuration

Athena parameters.

Ignore if you chose parameter DeployAthenaFlow as No.

IDCUser1Id User ID corresponding to User1 from IAM Identity Center.

The CloudFormation stack provisions the following resources:

  • A VPC with a public and private subnet.
  • If you chose the Redshift components, it also creates three additional subnets.
  • S3 buckets for data and Athena query results location storage. It also copies some sample data to the buckets.
  • EMR Studio with IAM Identity Center integration.
  • Amazon EMR security configuration with IAM Identity Center integration.
  • An EMR cluster that uses the EMR security group.
  • Registers the source S3 bucket with Lake Formation.
  • An AWS Glue database named oktank_tipblog_temp and a table named customer under the database. The table points to the Amazon S3 location governed by Lake Formation.
  • Allows external engines to access data in Amazon S3 locations with full table access. This is required for Amazon EMR integration with Lake Formation for trusted identity propagation. As of this writing, Amazon EMR supports table-level access with IAM Identity Center enabled clusters.
  • An S3 Access Grants instance.
  • S3 Access Grants for Group1 to the User1 prefix under the Athena query results location bucket.
  • S3 Access Grants for Group2 to the S3 bucket input and output prefixes. The user has read access to the input prefix and write access to the output prefix under the bucket.
  • An Amazon Redshift Serverless namespace and workgroup. This workgroup is not integrated with IAM Identity Center; we complete subsequent steps to enable IAM Identity Center for the workgroup.
  • An AWS Cloud9 integrated development environment (IDE), which we use to run AWS CLI commands during the setup.

Note the stack outputs on the AWS CloudFormation console. You use these values in later steps.

Choose the link for Cloud9URL in the stack output to open the AWS Cloud9 IDE. In AWS Cloud9, go to the Window tab and choose New Terminal to start a new bash terminal.

Set up Lake Formation

You need to enable Lake Formation with IAM Identity Center and enable an EMR application with Lake Formation integration. Complete the following steps:

  1. In the AWS Cloud9 bash terminal, enter the following command to get the Amazon EMR security configuration created by the stack:
aws emr describe-security-configuration --name TIP-EMRSecurityConfig | jq -r '.SecurityConfiguration | fromjson | .AuthenticationConfiguration.IdentityCenterConfiguration.IdCApplicationARN'
  1. Note the value for IdcApplicationARN from the output.
  2. Enter the following command in AWS Cloud9 to enable the Lake Formation integration with IAM Identity Center and add the Amazon EMR security configuration application as a trusted application in Lake Formation. If you already have the IAM Identity Center integration with Lake Formation, sign in to Lake Formation and add the preceding value to the list of applications instead of running the following command and proceed to next step.
aws lakeformation create-lake-formation-identity-center-configuration --catalog-id <Replace with CatalogId value from Cloudformation output> --instance-arn <Replace with IDCInstanceARN value from CloudFormation stack output> --external-filtering Status=ENABLED,AuthorizedTargets=<Replace with IdcApplicationARN value copied in previous step>

After this step, you should see the application on the Lake Formation console.

This completes the initial setup. In subsequent steps, we apply some additional configurations for specific user personas.

Validate user personas

To review the S3 Access Grants created by AWS CloudFormation, open the Amazon S3 console and Access Grants in the navigation pane. Choose the access grant you created to view its details.

The CloudFormation stack created the S3 Access Grants for Group1 for the User1 prefix under the Athena query results location bucket. This allows User1 to access the prefix under in the query results bucket. The stack also created the grants for Group2 for User2 to access the raw data bucket input and output prefixes.

Set up User1 access

Complete the steps in this section to set up User1 access.

Create an IAM Identity Center enabled Athena workgroup

Let’s create the Athena workgroup that will be used by User1.

Enter the following command in the AWS Cloud9 terminal. The command creates an IAM Identity Center integrated Athena workgroup and enables S3 Access Grants for the user-level prefix. These user identity-based S3 prefixes let users in an Athena workgroup keep their query results isolated from other users in the same workgroup. The prefix is automatically created by Athena when the CreateUserLevelPrefix option is enabled. Access to the prefix was granted by the CloudFormation stack.

aws athena create-work-group --cli-input-json '{
"Name": "AthenaIDCWG",
"Configuration": {
"ResultConfiguration": {
"OutputLocation": "<Replace with AthenaResultLocation from CloudFormation stack>"
},
"ExecutionRole": "<Replace with TIPStudioRoleArn from CloudFormation stack>",
"IdentityCenterConfiguration": {
"EnableIdentityCenter": true,
"IdentityCenterInstanceArn": "<Replace with IDCInstanceARN from CloudFormation stack>"
},
"QueryResultsS3AccessGrantsConfiguration": {
"EnableS3AccessGrants": true,
"CreateUserLevelPrefix": true,
"AuthenticationType": "DIRECTORY_IDENTITY"
},
"EnforceWorkGroupConfiguration":true
},
"Description": "Athena Workgroup with IDC integration"
}'

Grant access to User1 on the Athena workgroup

Sign in to the Athena console and grant access to Group1 to the workgroup as shown in the following screenshot. You can grant access to the user (User1) or to the group (Group1). In this post, we grant access to Group1.

Grant access to User1 in Lake Formation

Sign in to the Lake Formation console, choose Data lake permissions in the navigation pane, and grant access to the user group on the database oktank_tipblog_temp and table customer.

With Athena, you can grant access to specific columns and for specific rows with row-level filtering. For this post, we grant column-level access and restrict access to only selected columns for the table.

This completes the access permission setup for User1.

Verify access

Let’s see how User1 uses Athena to analyze the data.

  1. Copy the URL for EMRStudioURL from the CloudFormation stack output.
  2. Open a new browser window and connect to the URL.

You will be redirected to the Okta login page.

  1. Log in with User1.
  2. In the EMR Studio query editor, change the workgroup to AthenaIDCWG and choose Acknowledge.
  3. Run the following query in the query editor:
SELECT * FROM "oktank_tipblog_temp"."customer" limit 10;


You can see that the user is only able to access the columns for which permissions were previously granted in Lake Formation. This completes the access flow verification for User1.

Set up User2 access

User2 accesses the table using an EMR Studio notebook. Note the current considerations for EMR with IAM Identity Center integrations.

Complete the steps in this section to set up User2 access.

Grant Lake Formation permissions to User2

Sign in to the Lake Formation console and grant access to Group2 on the table, similar to the steps you followed earlier for User1. Also grant Describe permission on the default database to Group2, as shown in the following screenshot.

Create an EMR Studio Workspace

Next, User2 creates an EMR Studio Workspace.

  1. Copy the URL for EMR Studio from the EMRStudioURL value from the CloudFormation stack output.
  2. Log in to EMR Studio as User2 on the Okta login page.
  3. Create a Workspace, giving it a name and leaving all other options as default.

This will open a JupyterLab notebook in a new window.

Connect to the EMR Studio notebook

In the Compute pane of the notebook, select the EMR cluster (named EMRWithTIP) created by the CloudFormation stack to attach to it. After the notebook is attached to the cluster, choose the PySpark kernel to run Spark queries.

Verify access

Enter the following query in the notebook to read from the customer table:

spark.sql("select * from oktank_tipblog_temp.customer").show()


The user access works as expected based on the Lake Formation grants you provided earlier.

Run the following Spark query in the notebook to read data from the raw bucket. Access to this bucket is controlled by S3 Access Grants.

spark.read.option("header",True).csv("s3://tip-blog-s3-s3ag/input/*").show()

Let’s write this data to the same bucket and input prefix. This should fail because you only granted read access to the input prefix with S3 Access Grants.

spark.read.option("header",True).csv("s3://tip-blog-s3-s3ag/input/*").write.mode("overwrite").parquet("s3://tip-blog-s3-s3ag/input/")

The user has access to the output prefix under the bucket. Change the query to write to the output prefix:

spark.read.option("header",True).csv("s3://tip-blog-s3-s3ag/input/*").write.mode("overwrite").parquet("s3://tip-blog-s3-s3ag/output/test.part")

The write should now be successful.

We have now seen the data access controls and access flows for User1 and User2.

Set up User3 access

Following the target architecture in our post, Group3 users use the Redshift Query Editor v2 to query the Redshift tables.

Complete the steps in this section to set up access for User3.

Enable Redshift Query Editor v2 console access for User3

Complete the following steps:

  1. On the IAM Identity Center console, create a custom permission set and attach the following policies:
    1. AWS managed policy AmazonRedshiftQueryEditorV2ReadSharing.
    2. Customer managed policy redshift-idc-policy-tip. This policy is already created by the CloudFormation stack, so you don’t have to create it.
  2. Provide a name (tip-blog-qe-v2-permission-set) to the permission set.
  3. Set the relay state as https://<region-id>.console.aws.amazon.com/sqlworkbench/home (for example, https://us-east-1.console.aws.amazon.com/sqlworkbench/home).
  4. Choose Create.
  5. Assign Group3 to the account in IAM Identity Center, select the permission set you created, and choose Submit.

Create the Redshift IAM Identity Center application

Enter the following in the AWS Cloud9 terminal:

aws redshift create-redshift-idc-application \
--idc-instance-arn '<Replace with IDCInstanceARN value from CloudFormation Output>' \
--redshift-idc-application-name 'redshift-iad-<Replace with CatalogId value from CloudFormation output>-tip-blog-1' \
--identity-namespace 'tipblogawsidc' \
--idc-display-name 'TIPBlog_AWSIDC' \
--iam-role-arn '<Replace with TIPRedshiftRoleArn value from CloudFormation output>' \
--service-integrations '[
  {
    "LakeFormation": [
    {
     "LakeFormationQuery": {
     "Authorization": "Enabled"
    }
   }
  ]
 }
]'

Enter the following command to get the application details:

aws redshift describe-redshift-idc-applications --output json

Keep a note of the IdcManagedApplicationArn, IdcDisplayName, and IdentityNamespace values in the output for the application with IdcDisplayName TIPBlog_AWSIDC. You need these values in the next step.

Enable the Redshift Query Editor v2 for the Redshift IAM Identity Center application

Complete the following steps:

  1. On the Amazon Redshift console, choose IAM Identity Center connections in the navigation pane.
  2. Choose the application you created.
  3. Choose Edit.
  4. Select Enable Query Editor v2 application and choose Save changes.
  5. On the Groups tab, choose Add or assign groups.
  6. Assign Group3 to the application.

The Redshift IAM Identity Center connection is now set up.

Enable the Redshift Serverless namespace and workgroup with IAM Identity Center

The CloudFormation stack you deployed created a serverless namespace and workgroup. However, they’re not enabled with IAM Identity Center. To enable with IAM Identity Center, complete the following steps. You can get the namespace name from the RedshiftNamespace value of the CloudFormation stack output.

  1. On the Amazon Redshift Serverless dashboard console, navigate to the namespace you created.
  2. Choose Query Data to open Query Editor v2.
  3. Choose the options menu (three dots) and choose Create connections for the workgroup redshift-idc-wg-tipblog.
  4. Choose Other ways to connect and then Database user name and password.
  5. Use the credentials you provided for the Redshift admin user name and password parameters when deploying the CloudFormation stack and create the connection.

Create resources using the Redshift Query Editor v2

You now enter a series of commands in the query editor with the database admin user.

  1. Create an IdP for the Redshift IAM Identity Center application:
CREATE IDENTITY PROVIDER "TIPBlog_AWSIDC" TYPE AWSIDC
NAMESPACE 'tipblogawsidc'
APPLICATION_ARN '<Replace with IdcManagedApplicationArn value you copied earlier in Cloud9>'
IAM_ROLE '<Replace with TIPRedshiftRoleArn value from CloudFormation output>';
  1. Enter the following command to check the IdP you added previously:
SELECT * FROM svv_identity_providers;

Next, you grant permissions to the IAM Identity Center user.

  1. Create a role in Redshift. This role should correspond to the group in IAM Identity Center to which you intend to provide the permissions (Group3 in this post). The role should follow the format <namespace>:<GroupNameinIDC>.
Create role "tipblogawsidc:Group3";
  1. Run the following command to see role you created. The external_id corresponds to the group ID value for Group3 in IAM Identity Center.
Select * from svv_roles where role_name = 'tipblogawsidc:Group3';

  1. Create a sample table to use to verify access for the Group3 user:
CREATE TABLE IF NOT EXISTS revenue
(
account INTEGER ENCODE az64
,customer VARCHAR(20) ENCODE lzo
,salesamt NUMERIC(18,0) ENCODE az64
)
DISTSTYLE AUTO
;

insert into revenue values (10001, 'ABC Company', 12000);
insert into revenue values (10002, 'Tech Logistics', 175400);
  1. Grant access to the user on the schema:
-- Grant usage on schema
grant usage on schema public to role "tipblogawsidc:Group3";
  1. To create a datashare and add the preceding table to the datashare, enter the following statements:
CREATE DATASHARE demo_datashare;
ALTER DATASHARE demo_datashare ADD SCHEMA public;
ALTER DATASHARE demo_datashare ADD TABLE revenue;
  1. Grant usage on the datashare to the account using the Data Catalog:
GRANT USAGE ON DATASHARE demo_datashare TO ACCOUNT '<Replace with CatalogId from Cloud Formation Output>' via DATA CATALOG;

Authorize the datashare

For this post, we use the AWS CLI to authorize the datashare. You can also do it from the Amazon Redshift console.

Enter the following command in the AWS Cloud9 IDE to describe the datashare you created and note the value of DataShareArn and ConsumerIdentifier to use in subsequent steps:

aws redshift describe-data-shares

Enter the following command in the AWS Cloud9 IDE to the authorize the datashare:

aws redshift authorize-data-share --data-share-arn <Replace with DataShareArn value copied from earlier command’s output> --consumer-identifier <Replace with ConsumerIdentifier value copied from earlier command’s output >

Accept the datashare in Lake Formation

Next, accept the datashare in Lake Formation.

  1. On the Lake Formation console, choose Data sharing in the navigation pane.
  2. In the Invitations section, select the datashare invitation that is pending acceptance.
  3. Choose Review invitation and accept the datashare.
  4. Provide a database name (tip-blog-redshift-ds-db), which will be created in the Data Catalog by Lake Formation.
  5. Choose Skip to Review and Create and create the database.

Grant permissions in Lake Formation

Complete the following steps:

  1. On the Lake Formation console, choose Data lake permissions in the navigation pane.
  2. Choose Grant and in the Principals section, choose User3 to grant permissions with the IAM Identity Center-new option. Refer to the Lake Formation access grants steps performed for User1 and User2 if needed.
  3. Choose the database (tip-blog-redshift-ds-db) you created earlier and the table public.revenue, which you created in the Redshift Query Editor v2.
  4. For Table permissions¸ select Select.
  5. For Data permissions¸ select Column-based access and select the account and salesamt columns.
  6. Choose Grant.

Mount the AWS Glue database to Amazon Redshift

As the last step in the setup, mount the AWS Glue database to Amazon Redshift. In the Query Editor v2, enter the following statements:

create external schema if not exists tipblog_datashare_idc_schema from DATA CATALOG DATABASE 'tip-blog-redshift-ds-db' catalog_id '<Replace with CatalogId from CloudFormation output>';

grant usage on schema tipblog_datashare_idc_schema to role "tipblogawsidc:Group3";

grant select on all tables in schema tipblog_datashare_idc_schema to role "tipblogawsidc:Group3";

You are now done with the required setup and permissions for User3 on the Redshift table.

Verify access

To verify access, complete the following steps:

  1. Get the AWS access portal URL from the IAM Identity Center Settings section.
  2. Open a different browser and enter the access portal URL.

This will redirect you to your Okta login page.

  1. Sign in, select the account, and choose the tip-blog-qe-v2-permission-set link to open the Query Editor v2.

If you’re using private or incognito mode for testing this, you may need to enable third-party cookies.

  1. Choose the options menu (three dots) and choose Edit connection for the redshift-idc-wg-tipblog workgroup.
  2. Use IAM Identity Center in the pop-up window and choose Continue.

If you get an error with the message “Redshift serverless cluster is auto paused,” switch to the other browser with admin credentials and run any sample queries to un-pause the cluster. Then switch back to this browser and continue the next steps.

  1. Run the following query to access the table:
SELECT * FROM "dev"."tipblog_datashare_idc_schema"."public.revenue";

You can only see the two columns due to the access grants you provided in Lake Formation earlier.

This completes configuring User3 access to the Redshift table.

Set up QuickSight for User3

Let’s now set up QuickSight and verify access for User3. We already granted access to User3 to the Redshift table in earlier steps.

  1. Create a new IAM Identity Center enabled QuickSight account. Refer to Simplify business intelligence identity management with Amazon QuickSight and AWS IAM Identity Center for guidance.
  2. Choose Group3 for the author and reader for this post.
  3. For IAM Role, choose the IAM role matching the RoleQuickSight value from the CloudFormation stack output.

Next, you add a VPC connection to QuickSight to access the Redshift Serverless namespace you created earlier.

  1. On the QuickSight console, manage your VPC connections.
  2. Choose Add VPC connection.
  3. For VPC connection name, enter a name.
  4. For VPC ID, enter the value for VPCId from the CloudFormation stack output.
  5. For Execution role, choose the value for RoleQuickSight from the CloudFormation stack output.
  6. For Security Group IDs, choose the security group for QSSecurityGroup from the CloudFormation stack output.

  1. Wait for the VPC connection to be AVAILABLE.
  2. Enter the following command in AWS Cloud9 to enable QuickSight with Amazon Redshift for trusted identity propagation:
aws quicksight update-identity-propagation-config --aws-account-id "<Replace with CatalogId from CloudFormation output>" --service "REDSHIFT" --authorized-targets "< Replace with IdcManagedApplicationArn value from output of aws redshift describe-redshift-idc-applications --output json which you copied earlier>"

Verify User3 access with QuickSight

Complete the following steps:

  1. Sign in to the QuickSight console as User3 in a different browser.
  2. On the Okta sign-in page, sign in as User 3.
  3. Create a new dataset with Amazon Redshift as the data source.
  4. Choose the VPC connection you created above for Connection Type.
  5. Provide the Redshift server (the RedshiftSrverlessWorkgroup value from the CloudFormation stack output), port (5439 in this post), and database name (dev in this post).
  6. Under Authentication method, select Single sign-on.
  7. Choose Validate, then choose Create data source.

If you encounter an issue with validating using single sign-on, switch to Database username and password for Authentication method, validate with any dummy user and password, and then switch back to validate using single sign-on and proceed to the next step. Also check that the Redshift serverless cluster is not auto-paused as mentioned earlier in Redshift access verification.

  1. Choose the schema you created earlier (tipblog_datashare_idc_schema) and the table public.revenue
  2. Choose Select to create your dataset.

You should now be able to visualize the data in QuickSight. You are only able to only see the account and salesamt columns from the table because of the access permissions you granted earlier with Lake Formation.

This finishes all the steps for setting up trusted identity propagation.

Audit data access

Let’s see how we can audit the data access with the different users.

Access requests are logged to CloudTrail. The IAM Identity Center user ID is logged under the onBehalfOf tag in the CloudTrail event. The following screenshot shows the GetDataAccess event generated by Lake Formation. You can view the CloudTrail event history and filter by event name GetDataAccess to view similar events in your account.

You can see the userId corresponds to User2.

You can run the following commands in AWS Cloud9 to confirm this.

Get the identity store ID:

aws sso-admin describe-instance --instance-arn <Replace with your instance arn value> | jq -r '.IdentityStoreId'

Describe the user in the identity store:

aws identitystore describe-user --identity-store-id <Replace with output of above command> --user-id <User Id from above screenshot>

One way to query the CloudTrail log events is by using CloudTrail Lake. Set up the event data store (refer to the following instructions) and rerun the queries for User1, User2, and User3. You can query the access events using CloudTrail Lake with the following sample query:

SELECT eventTime,userIdentity.onBehalfOf.userid AS idcUserId,requestParameters as accessInfo, serviceEventDetails
FROM 04d81d04-753f-42e0-a31f-2810659d9c27
WHERE userIdentity.arn IS NOT NULL AND eventName='BatchGetTable' or eventName='GetDataAccess' or eventName='CreateDataSet'
order by eventTime DESC

The following screenshot shows an example of the detailed results with audit explanations.

Clean up

To avoid incurring further charges, delete the CloudFormation stack. Before you delete the CloudFormation stack, delete all the resources you created using the console or AWS CLI:

  1. Manually delete any EMR Studio Workspaces you created with User2.
  2. Delete the Athena workgroup created as part of the User1 setup.
  3. Delete the QuickSight VPC connection you created.
  4. Delete the Redshift IAM Identity Center connection.
  5. Deregister IAM Identity Center from S3 Access Grants.
  6. Delete the CloudFormation stack.
  7. Manually delete the VPC created by AWS CloudFormation.

Conclusion

In this post, we delved into the trusted identity propagation feature of AWS Identity Center alongside various AWS Analytics services, demonstrating its utility in managing permissions using corporate user or group identities rather than IAM roles. We examined diverse user personas utilizing interactive tools like Athena, EMR Studio notebooks, Redshift Query Editor V2, and QuickSight, all centralized under Lake Formation for streamlined permission management. Additionally, we explored S3 Access Grants for S3 bucket access management, and concluded with insights into auditing through CloudTrail events and CloudTrail Lake for a comprehensive overview of user data access.

For further reading, refer to the following resources:


About the Author

Shoukat Ghouse is a Senior Big Data Specialist Solutions Architect at AWS. He helps customers around the world build robust, efficient and scalable data platforms on AWS leveraging AWS analytics services like AWS Glue, AWS Lake Formation, Amazon Athena and Amazon EMR.

Use AWS Data Exchange to seamlessly share Apache Hudi datasets

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/use-aws-data-exchange-to-seamlessly-share-apache-hudi-datasets/

Apache Hudi was originally developed by Uber in 2016 to bring to life a transactional data lake that could quickly and reliably absorb updates to support the massive growth of the company’s ride-sharing platform. Apache Hudi is now widely used to build very large-scale data lakes by many across the industry. Today, Hudi is the most active and high-performing open source data lakehouse project, known for fast incremental updates and a robust services layer.

Apache Hudi serves as an important data management tool because it allows you to bring full online transaction processing (OLTP) database functionality to data stored in your data lake. As a result, Hudi users can store massive amounts of data with the data scaling costs of a cloud object store, rather than the more expensive scaling costs of a data warehouse or database. It also provides data lineage, integration with leading access control and governance mechanisms, and incremental ingestion of data for near real-time performance. AWS, along with its partners in the open source community, has embraced Apache Hudi in several services, offering Hudi compatibility in Amazon EMR, Amazon Athena, Amazon Redshift, and more.

AWS Data Exchange is a service provided by AWS that enables you to find, subscribe to, and use third-party datasets in the AWS Cloud. A dataset in AWS Data Exchange is a collection of data that can be changed or updated over time. It also provides a platform through which a data producer can make their data available for consumption for subscribers.

In this post, we show how you can take advantage of the data sharing capabilities in AWS Data Exchange on top of Apache Hudi.

Benefits of AWS Data Exchange

AWS Data Exchange offers a series of benefits to both parties. For subscribers, it provides a convenient way to access and use third-party data without the need to build and maintain data delivery, entitlement, or billing technology. Subscribers can find and subscribe to thousands of products from qualified AWS Data Exchange providers and use them with AWS services. For providers, AWS Data Exchange offers a secure, transparent, and reliable channel to reach AWS customers. It eliminates the need to build and maintain data delivery, entitlement, and billing technology, allowing providers to focus on creating and managing their datasets.

To become a provider on AWS Data Exchange, there are a few steps to determine eligibility. Providers need to register to be a provider, make sure their data meets the legal eligibility requirements, and create datasets, revisions, and import assets. Providers can define public offers for their data products, including prices, durations, data subscription agreements, refund policies, and custom offers. The AWS Data Exchange API and AWS Data Exchange console can be used for managing datasets and assets.

Overall, AWS Data Exchange simplifies the process of data sharing in the AWS Cloud by providing a platform for customers to find and subscribe to third-party data, and for providers to publish and manage their data products. It offers benefits for both subscribers and providers by eliminating the need for complex data delivery and entitlement technology and providing a secure and reliable channel for data exchange.

Solution overview

Combining the scale and operational capabilities of Apache Hudi with the secure data sharing features of AWS Data Exchange enables you to maintain a single source of truth for your transactional data. Simultaneously, it enables automatic business value generation by allowing other stakeholders to use the insights that the data can provide. This post shows how to set up such a system in your AWS environment using Amazon Simple Storage Service (Amazon S3), Amazon EMR, Amazon Athena, and AWS Data Exchange. The following diagram illustrates the solution architecture.

Set up your environment for data sharing

You need to register as a data producer before you create datasets and list them in AWS Data Exchange as data products. Complete the following steps to register as a data provider:

  1. Sign in to the AWS account that you want to use to list and manage products on AWS Data Exchange.
    As a provider, you are responsible for complying with these guidelines and the Terms and Conditions for AWS Marketplace Sellers and the AWS Customer Agreement. AWS may update these guidelines. AWS removes any product that breaches these guidelines and may suspend the provider from future use of the service. AWS Data Exchange may have some AWS Regional requirements; refer to Service endpoints for more information.
  2.  Open the AWS Marketplace Management Portal registration page and enter the relevant information about how you will use AWS Data Exchange.
  3. For Legal business name, enter the name that your customers see when subscribing to your data.
  4. Review the terms and conditions and select I have read and agree to the AWS Marketplace Seller Terms and Conditions.
  5. Select the information related to the types of products you will be creating as a data provider.
  6. Choose Register & Sign into Management Portal.

If you want to submit paid products to AWS Marketplace or AWS Data Exchange, you must provide your tax and banking information. You can add this information on the Settings page:

  1. Choose the Payment information tab.
  2. Choose Complete tax information and complete the form.
  3. Choose Complete banking information and complete the form.
  4. Choose the Public profile tab and update your public profile.
  5. Choose the Notifications tab and configure an additional email address to receive notifications.

You’re now ready to configure seamless data sharing with AWS Data Exchange.

Upload Apache Hudi datasets to AWS Data Exchange

After you create your Hudi datasets and register as a data provider, complete the following steps to create the datasets in AWS Data Exchange:

  1. Sign in to the AWS account that you want to use to list and manage products on AWS Data Exchange.
  2. On the AWS Data Exchange console, choose Owned data sets in the navigation pane.
  3. Choose Create data set.
  4. Select the dataset type you want to create (for this post, we select Amazon S3 data access).
  5. Choose Choose Amazon S3 locations.
  6. Choose the Amazon S3 location where you have your Hudi datasets.

After you add the Amazon S3 location to register in AWS Data Exchange, a bucket policy is generated.

  1. Copy the JSON file and update the bucket policy in Amazon S3.
  2. After you update the bucket policy, choose Next.
  3. Wait for the CREATE_S3_DATA_ACCESS_FROM_S3_BUCKET job to show as Completed, then choose Finalize data set.

Publish a product using the registered Hudi dataset

Complete the following steps to publish a product using the Hudi dataset:

  1. On the AWS Data Exchange console, choose Products in the navigation pane.
    Make sure you’re in the Region where you want to create the product.
  2. Choose Publish new product to start the workflow to create a new product.
  3. Choose which product visibility you want to have: public (it will be publicly available in AWS Data Exchange catalog as well as the AWS Marketplace websites) or private (only the AWS accounts you share with will have access to it).
  4. Select the sensitive information category of the data you are publishing.
  5. Choose Next.
  6. Select the dataset that you want to add to the product, then choose Add selected to add the dataset to the new product.
  7. Define access to your dataset revisions based on time. For more information, see Revision access rules.
  8. Choose Next.
  9. Provide the information for a new product, including a short description.
    One of the required fields is the product logo, which must be in a supported image format (PNG, JPG, or JPEG) and the file size must be 100 KB or less.
  10. Optionally, in the Define product section, under Data dictionaries and samples, select a dataset and choose Edit to upload a data dictionary to the product.
  11. For Long description, enter the description to display to your customers when they look at your product. Markdown formatting is supported.
  12. Choose Next.
  13. Based on your choice of product visibility, configure the offer, renewal, and data subscription agreement.
  14. Choose Next.
  15. Review all the products and offer information, then choose Publish to create the new private product.

Manage permissions and access controls for shared datasets

Datasets that are published on AWS Data Exchange can only be used when customers are subscribed to the products. Complete the following steps to subscribe to the data:

  1. On the AWS Data Exchange console, choose Browse catalog in the navigation pane.
  2. In the search bar, enter the name of the product you want to subscribe to and press Enter.
  3. Choose the product to view its detail page.
  4. On the product detail page, choose Continue to Subscribe.
  5. Choose your preferred price and duration combination, choose whether to enable auto-renewal for the subscription, and review the offer details, including the data subscription agreement (DSA).
    The dataset is available in the US East (N. Virginia) Region.
  6. Review the pricing information, choose the pricing offer and, if you and your organization agree to the DSA, pricing, and support information, choose Subscribe.

After the subscription has gone through, you will be able to see the product on the Subscriptions page.

Create a table in Athena using an Amazon S3 access point

Complete the following steps to create a table in Athena:

  1. Open the Athena console.
  2. If this is the first time using Athena, choose Explore Query Editor and set up the S3 bucket where query results will be written:
    Athena will display the results of your query on the Athena console, or send them through your ODBC/JDBC driver if that is what you are using. Additionally, the results are written to the result S3 bucket.

    1. Choose View settings.
    2. Choose Manage.
    3. Under Query result location and encryption, choose Browse Amazon S3 to choose the location where query results will be written.
    4. Choose Save.
    5. Choose a bucket and folder you want to automatically write the query results to.
      Athena will display the results of your query on the Athena console, or send them through your ODBC/JDBC driver if that is what you are using. Additionally, the results are written to the result S3 bucket.
  3. Complete the following steps to create a workgroup:
    1. In the navigation pane, choose Workgroups.
    2. Choose Create workgroup.
    3. Enter a name for your workgroup (for this post, data_exchange), select your analytics engine (Athena SQL), and select Turn on queries on requester pay buckets in Amazon S3.
      This is important to access third-party datasets.
    4. In the Athena query editor, choose the workgroup you created.
    5. Run the following DDL to create the table:

Now you can run your analytical queries using Athena SQL statements. The following screenshot shows an example of the query results.

Enhanced customer collaboration and experience with AWS Data Exchange and Apache Hudi

AWS Data Exchange provides a secure and simple interface to access high-quality data. By providing access to over 3,500 datasets, you can use leading high-quality data in your analytics and data science. Additionally, the ability to add Hudi datasets as shown in this post allows you to enable deeper integration with lakehouse use cases. There are several potential use cases where having Apache Hudi datasets integrated into AWS Data Exchange can accelerate business outcomes, such as the following:

  • Near real-time updated datasets – One of Apache Hudi’s defining features is the ability to provide near real-time incremental data processing. As new data flows in, Hudi allows that data to be ingested in real time, providing a central source of up-to-date truth. AWS Data Exchange supports dynamically updated datasets, which can keep up with these incremental updates. For downstream customers that rely on the most up-to-date information for their use cases, the combination of Apache Hudi and AWS Data Exchange means that they can subscribe to a dataset in AWS Data Exchange and know that they’re getting incrementally updated data.
  • Incremental pipelines and processing – Hudi supports incremental processing and updates to data in the data lake. This is especially valuable because it enables you to only update or process any data that has changed and materialized views that are valuable for your business use case.

Best practices and recommendations

We recommend the following best practices for security and compliance:

  • Enable AWS Lake Formation or other data governance systems as part of creating the source data lake
  • To maintain compliance, you can use the guides provided by AWS Artifact

For monitoring and management, you can enable Amazon CloudWatch logs on your EMR clusters along with CloudWatch alerts to maintain pipeline health.

Conclusion

Apache Hudi enables you to bring to life massive amounts of data stored in Amazon S3 for analytics. It provides full OLAP capabilities, enables incremental processing and querying, along with maintaining the ability to run deletes to remain GDPR compliant. Combining this with the secure, reliable, and user-friendly data sharing capabilities of AWS Data Exchange means that the business value unlocked by a Hudi lakehouse doesn’t need to remain limited to the producer that generates this data.

For more use cases about using AWS Data Exchange, see Learning Resources for Using Third-Party Data in the Cloud. To learn more about creating Apache Hudi data lakes, refer to Build your Apache Hudi data lake on AWS using Amazon EMR – Part 1. You can also consider using a fully managed lakehouse product such as Onehouse.


About the Authors

Saurabh Bhutyani is a Principal Analytics Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running generative AI use cases, scalable analytics solutions and data mesh architectures using AWS services like Amazon Bedrock, Amazon SageMaker, Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Ankith Ede is a Data & Machine Learning Engineer at Amazon Web Services, based in New York City. He has years of experience building Machine Learning, Artificial Intelligence, and Analytics based solutions for large enterprise clients across various industries. He is passionate about helping customers build scalable and secure cloud based solutions at the cutting edge of technology innovation.

Chandra Krishnan is a Solutions Engineer at Onehouse, based in New York City. He works on helping Onehouse customers build business value from their data lakehouse deployments and enjoys solving exciting challenges on behalf of his customers. Prior to Onehouse, Chandra worked at AWS as a Data and ML Engineer, helping large enterprise clients build cutting edge systems to drive innovation in their organizations.

Understanding Apache Iceberg on AWS with the new technical guide

Post Syndicated from Carlos Rodrigues original https://aws.amazon.com/blogs/big-data/understanding-apache-iceberg-on-aws-with-the-new-technical-guide/

We’re excited to announce the launch of the Apache Iceberg on AWS technical guide. Whether you are new to Apache Iceberg on AWS or already running production workloads on AWS, this comprehensive technical guide offers detailed guidance on foundational concepts to advanced optimizations to build your transactional data lake with Apache Iceberg on AWS.

Apache Iceberg is an open source table format that simplifies data processing on large datasets stored in data lakes. It does so by bringing the familiarity of SQL tables to big data and capabilities such as ACID transactions, row-level operations (merge, update, delete), partition evolution, data versioning, incremental processing, and advanced query scanning. Apache Iceberg seamlessly integrates with popular open source big data processing frameworks like Apache Spark, Apache Hive, Apache Flink, Presto, and Trino. It is natively supported by AWS analytics services such as AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift.

The following diagram illustrates a reference architecture of a transactional data lake with Apache Iceberg on AWS.

AWS customers and data engineers use the Apache Iceberg table format for its many benefits, as well as for its high performance and reliability at scale to build transactional data lakes and write-optimized solutions with Amazon EMR, AWS Glue, Athena, and Amazon Redshift on Amazon Simple Storage Service (Amazon S3).

We believe Apache Iceberg adoption on AWS will continue to grow rapidly, and you can benefit from this technical guide that delivers productive guidance on working with Apache Iceberg on supported AWS services, best practices on cost-optimization and performance, and effective monitoring and maintenance policies.

Related resources


About the Authors

Carlos Rodrigues is a Big Data Specialist Solutions Architect at AWS. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Iceberg and Apache Hudi. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He is an expert on data engineering and enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Shana Schipers is an Analytics Specialist Solutions Architect at AWS, focusing on big data. She supports customers worldwide in building transactional data lakes using open table formats like Apache Hudi, Apache Iceberg, and Delta Lake on AWS.

Analyze Elastic IP usage history using Amazon Athena and AWS CloudTrail

Post Syndicated from Aidin Khosrowshahi original https://aws.amazon.com/blogs/big-data/analyze-elastic-ip-usage-history-using-amazon-athena-and-aws-cloudtrail/

An AWS Elastic IP (EIP) address is a static, public, and unique IPv4 address. Allocated exclusively to your AWS account, the EIP remains under your control until you decide to release it. It can be allocated to your Amazon Elastic Compute Cloud (Amazon EC2) instance or other AWS resources such as load balancers.

EIP addresses are designed for dynamic cloud computing because they can be re-mapped to another instance to mask any disruptions. These EIPs are also used for applications that must make external requests to services that require a consistent address for allow listed inbound connections. As your application usage varies, these EIPs might see sporadic use over weeks or even months, leading to potential accumulation of unused EIPs that may inadvertently inflate your AWS expenditure.

In this post, we show you how to analyze EIP usage history using AWS CloudTrail and Amazon Athena to have a better insight of your EIP usage pattern in your AWS account. You can use this solution regularly as part of your cost-optimization efforts to safely remove unused EIPs to reduce your costs.

Solution overview

This solution uses activity logs from CloudTrail and the power of Athena to conduct a comprehensive analysis of historical EIP attachment activity within your AWS account. CloudTrail, a critical AWS service, meticulously logs API activity within an AWS account.

Athena is an interactive query service that simplifies data analysis in Amazon Simple Storage Service (Amazon S3) using standard SQL. It is a serverless service, eliminating the need for infrastructure management and costing you only for the queries you run.

By extracting detailed information from CloudTrail and querying it using Athena, this solution streamlines the process of data collection, analysis, and reporting of EIP usage within an AWS account.

To gather EIP usage reporting, this solution compares snapshots of the current EIPs, focusing on their most recent attachment within a customizable 3-month period. It then determines the frequency of EIP attachments to resources. An attachment count greater than zero suggests that the EIPs are actively in use. In contrast, an attachment count of zero indicates that these EIPs are idle and can be released, aiding in identifying potential areas for cost reduction.

In the following sections, we show you how to deploy the solution using AWS CloudFormation and then run an analysis.

Prerequisites

Complete the following prerequisite steps:

  1. If your account doesn’t have CloudTrail enabled, create a trail, then capture the S3 bucket name to use later in the implementation steps.
  2. Download the CloudFormation template from the repository. You need this template.yaml file for the implementation steps.

Deploy the solution

In this section, you use AWS CloudFormation to create the required resources. AWS CloudFormation is a service that helps you model and set up your AWS resources so that you can spend less time managing those resources and more time focusing on your applications that run in AWS.

The CloudFormation template creates Athena views and a table to search past AssociateAddress events in CloudTrail, an AWS Lambda function to collect snapshots of existing EIPs, and an S3 bucket to store the analysis results.

Complete the following steps:

  1. On the AWS CloudFormation console, choose on Create stack and choose With new resources (standard).
  2. In the Specify Template section, choose an existing template and upload the template.yaml file downloaded from the prerequisites.
  3. In the Specify stack details section, enter your preferred stack name and the existing CloudTrail S3 location, and maintain the default settings for the other parameters.
  4. At the bottom of the Review and create page, select the acknowledgement check box, then choose Submit.

Wait for the stack to be created. It should take a few minutes to complete. You can open the AWS CloudFormation console to view the stack creation process.

Run an analysis

You have configured the solution to run your EIP attachments analysis. Complete the following steps to analyze your EIP attachment history. If you’re using Athena for the first time in your account, you need to set up a query result location in Amazon S3.

  1. On the Athena console, navigate to the query editor.
  2. For Database, choose default.
  3. Enter the following query and choose Run query:
select 
eip.publicip,
eip.allocationid,
eip.region,
eip.accountid,
eip.associationid, 
eip.PublicIpv4Pool,
max(associate_ip_event.eventtime) as latest_attachment,
count(associate_ip_event.associationid) as attachmentCount
from eip LEFT JOIN associate_ip_event on associate_ip_event.allocationid = eip.allocationid 
group by 1,2,3,4,5,6

All the required tables are created under the default database.

You can now run a query on the CloudTrail logs to look back in time for the EIP attachment. This query provides you with better insight to safely release idle EIPs in order to reduce costs by displaying how frequently each specific EIP was previously attached to any resources.

This report will provide the following information:

  • Public IP
  • Allocation ID (the ID that AWS assigns to represent the allocation of the EIP address for use with instances in a VPC)
  • Region
  • Account ID
  • latest_attachment date (the last time EIP was attached to a resource)
  • attachmentCount (number of attachments)
  • The association ID for the address (if this field is empty, the EIP is idle and not attached to any resources)

The following screenshot shows the query results.

Clean up

To optimize cost, clean up the resources you deployed for this post by completing the following steps:

  1. Delete the contents in your S3 buckets (eip-analyzer-eipsnapshot-* and eip-analyzer-athenaresulteipanalyzer-*).
  2. Delete the S3 buckets.
  3. On the AWS CloudFormation console, delete the stack you created.

Conclusion

This post demonstrated how you can analyze Elastic IP usage history to have a better insight of EIP attachment patterns using Athena and CloudTrail. Check out the GitHub repo to regularly run this analysis as part of your cost-optimization strategy to identify and release inactive EIPs to reduce costs.

You can also use Athena to analyze logs from other AWS services; for more information, see Querying AWS service logs.

Additionally, you can analyze activity logs with AWS CloudTrail Lake and Amazon Athena. AWS CloudTrail Lake is a managed data lake that enables organizations to aggregate, immutably store, and query events recorded by CloudTrail for auditing, security investigation, and operational troubleshooting. AWS CloudTrail Lake supports the collection of events from multiple AWS regions and AWS accounts. For CloudTrail Lake, you pay for data ingestion, retention, and analysis. Refer to AWS CloudTrail Lake pricing page for pricing details.


About the Author

Aidin Khosrowshahi is a Senior Technical Account Manager with Amazon Web Services based out of San Francisco. He focuses on reliability, optimization, and improving operational mechanisms with his customers.

How Fujitsu implemented a global data mesh architecture and democratized data

Post Syndicated from Kanehito Miyake original https://aws.amazon.com/blogs/big-data/how-fujitsu-implemented-a-global-data-mesh-architecture-and-democratized-data/

This is a guest post co-authored with Kanehito Miyake, Engineer at Fujitsu Japan. 

Fujitsu Limited was established in Japan in 1935. Currently, we have approximately 120,000 employees worldwide (as of March 2023), including group companies. We develop business in various regions around the world, starting with Japan, and provide digital services globally. To provide a variety of products, services, and solutions that are better suited to customers and society in each region, we have built business processes and systems that are optimized for each region and its market.

However, in recent years, the IT market environment has changed drastically, and it has become difficult for the entire group to respond flexibly to the individual market situation. Moreover, we are challenged not only to revisit individual products, services, and solutions, but also to reinvent entire business processes and operations.

To transform Fujitsu from an IT company to a digital transformation (DX) company, and to become a world-leading DX partner, Fujitsu has declared a shift to data-driven management. We built the OneFujitsu program, which standardizes business projects and systems throughout the company, including the domestic and overseas group companies, and tackles the major transformation of the entire company under the program.

To achieve data-driven management, we built OneData, a data utilization platform used in the four global AWS Regions, which started operation in April 2022. As of November 2023, more than 200 projects and 37,000 users were onboarded. The platform consists of approximately 370 dashboards, 360 tables registered in the data catalog, and 40 linked systems. The data size stored in Amazon Simple Storage Service (Amazon S3) exceeds 100 TB, including data processed for use in each project.

In this post, we introduce our OneData initiative. We explain how Fujitsu worked to solve the aforementioned issues and introduce an overview of the OneData design concept and its implementation. We hope this post will provide some guidance for architects and engineers.

Challenges

Like many other companies struggling with data utilization, Fujitsu faced some challenges, which we discuss in this section.

Siloed data

In Fujitsu’s long history, we restructured organizations by merging affiliated companies into Fujitsu. Although organizational integration has progressed, there are still many systems and mechanisms customized for individual context. There are also many systems and mechanisms overlapping across different organizations. For this reason, it takes a lot of time and effort to discover, search, and integrate data when analyzing the entire company using a common standard. This situation makes it difficult for management to grasp business trends and make decisions in a timely manner.

Under these circumstances, the OneFujitsu program is designed have one system per one business globally. Core systems such as ERP and CRM are being integrated and unified in order to not have silos. It will make it easier for users to utilize data across different organizations for specific business areas.

However, to spread a culture of data-driven decision-making not only in management but also in every organization, it is necessary to have a mechanism that enables users to easily discover various types of data in organizations, and then analyze the data quickly and flexibly when needed.

Excel-based data utilization

Microsoft Excel is available on almost everyone’s PC in the company, and it helps lower the hurdles when starting to utilize data. However, Excel is mainly designed for spreadsheets; it’s not designed for large-scale data analytics and automation. Excel files tend to contain a mixture of data and procedures (functions, macros), and many users casually copy files for one-time use cases. It introduces complexity to keep both data and procedures up to date. Furthermore, it tends to require domain-specific knowledge to manage the Excel files for individual context.

For those reasons, it was extremely difficult for Fujitsu to manage and utilize data at scale with Excel.

Solution overview

OneData defines three personas:

  • Publisher – This role includes the organizational and management team of systems that serve as data sources. Responsibilities include:
    • Load raw data from the data source system at the appropriate frequency.
    • Provide and keep up to date with technical metadata for loaded data.
    • Perform the cleansing process and format conversion of raw data as needed.
    • Grant access permissions to data based on the requests from data users.
  • Consumer – Consumers are organizations and projects that use the data. Responsibilities include:
    • Look for the data to be used from the technical data catalog and request access to the data.
    • Handle the process and conversion of data into a format suitable for their own use (such as fact-dimension) with granted referencing permissions.
    • Configure business intelligence (BI) dashboards to provide data-driven insights to end-users targeted by the consumer’s project.
    • Use the latest data published by the publisher to update data as needed.
    • Promote and expand the use of databases.
  • Foundation – This role encompasses the data steward and governance team. Responsibilities include:
    • Provide a preprocessed, generic dataset of data commonly used by many consumers.
    • Manage and guide metrics for the quality of data published by each publisher.

Each role has sub-roles. For example, the consumer role has the following sub-roles with different responsibilities:

  • Data engineer – Create data process for analysis
  • Dashboard developer – Create a BI dashboard
  • Dashboard viewer – Monitor the BI dashboard

The following diagram describes how OneData platform works with those roles.

Let’s look at the key components of this architecture in more detail.

Publisher and consumer

In the OneData platform, the publisher is per each data source system, and the consumer is defined per each data utilization project. OneData provides an AWS account for each.

This enables the publisher to cleanse data and the consumer to process and analyze data at scale. In addition, by properly separating data and processing, it becomes effortless for the teams and organizations to share, manage, and inherit processes that were traditionally confined to individual PCs.

Foundation

When the teams don’t have a robust enough skillset, it can require more time to model and process data, and cause longer latency and lower data quality. It can also contribute to lower utilization by end-users. To address this, the foundation role provides an already processed dataset as a generic data model for data commonly use cases used by many consumers. This enables high-quality data available to each consumer. Here, the foundation role takes the lead in compiling the knowledge of domain experts and making data suitable for analysis. It is also an effective approach that eliminates duplicates for consumers. In addition, the foundation role monitors the state of the metadata, data quality indicators, data permissions, information classification labels, and so on. It is crucial in data governance and data management.

BI and visualization

Individual consumers have a dedicated space in a BI tool. In the past, if users wanted to go beyond simple data visualization using Excel, they had to build and maintain their own BI tools, which caused silos. By unifying these BI tools, OneData lowers the difficulty for consumers to use BI tools, and centralizes operation and maintenance, achieving optimization on a company-wide scale.

Additionally, to keep portability between BI tools, OneData recommends users transform data within the consumer AWS account instead of transforming data in the BI tool. With this approach, BI tool loads data from AWS Glue Data Catalog tables through an Amazon Athena JDBC/ODBC driver without any further transformations.

Deployment and operational excellence

To provide OneData as a common service for Fujitsu and group companies around the world, Regional OneData has been deployed in several locations. Regional OneData represents a unit of system configurations, and is designed to provide lower network latency for platform users, and be optimized for local languages, working hours for system operations and support, and region-specific legal restrictions, such as data residency and personal information protection.

The Regional Operations Unit (ROU), a virtual organization that brings together members from each region, is responsible for operating regional OneData in each of these regions. OneData HQ is responsible for supervising these ROUs, as well as planning and managing the entire OneData.

In addition, we have a specially positioned OneData called Global OneData, where global data utilization spans each region. Only the properly cleansed and sanitized data is transferred between each Regional OneData and Global OneData.

Systems such as ERP and CRM are accumulating data as a publisher for Global OneData, and the dashboards for executives in various regions to monitor business conditions with global metrics are also acting as a consumer for Global OneData.

Technical concepts

In this section, we discuss some of the technical concepts of the solution.

Large scale multi-account

We have adopted a multi-account strategy to provide AWS accounts for each project. Many publishers and consumers are already onboarded into OneData, and the number is expected to increase in the future. With this strategy, future usage expansion at scale can be achieved without affecting the users.

Also, this strategy allowed us to have clear boundaries in security, costs, and service quotas for each AWS service.

All the AWS accounts are deployed and managed through AWS Organizations and AWS Control Tower.

Serverless

Although we provide independent AWS accounts for each publisher and consumer, both operational costs and resource costs would be enormous if we accommodated individual user requests, such as, “I want a virtual machine or RDBMS to run specific tools for data processing.” To avoid such continuous operational and resource costs, we have adopted AWS serverless services for all the computing resources necessary for our activities as a publisher and consumer.

We use AWS Glue to preprocess, cleanse, and enrich data. Optionally, AWS Lambda or Amazon Elastic Container Service (Amazon ECS) with AWS Fargate can also be used based on preferences. We allow users to set up AWS Step Functions for orchestration and Amazon CloudWatch for monitoring. In addition, we provide Amazon Aurora Serverless PostgreSQL as standard for consumers, to meet their needs for data processing with extract, load, and transform (ELT) jobs. With this approach, only the consumer who requires those services will incur charges based on usage. We are able to take advantage of lower operational and resource costs thanks to the unique benefit of serverless (or more accurately, pay-as-you-go) services.

AWS provides many serverless services, and OneData has integrated them to provide scalability that allows active users to quickly provide the required capability as needed, while minimizing the cost for non-frequent users.

Data ownership and access control

In OneData, we have adopted a data mesh architecture where each publisher maintains ownership of data in a distributed and decentralized manner. When the consumer discovers the data they want to use, they request access from the publisher. The publisher accepts the request and grants permissions only when the request meets their own criteria. With the AWS Glue Data Catalog and AWS Lake Formation, there is no need to update S3 bucket policies or AWS Identity and Access Management (IAM) policies every time we allow access for individual data on an S3 data lake, and we can effortlessly grant the necessary permissions for the databases, tables, columns, and rows when needed.

Conclusion

Since the launch of OneData in April 2022, we have been persistently carrying out educational activities to expand the number of users and introducing success stories on our portal site. As a result, we have been promoting change management within the company and are actively utilizing data in each department. Regional OneData is being rolled out gradually, and we plan to further expand the scale of use in the future.

With its global expansion, the development of basic functions as a data utilization platform will reach a milestone. As we move forward, it will be important to make sure that OneData platform is used effectively throughout Fujitsu, while incorporating new technologies related to data analysis as appropriate. For example, we are preparing to provide more advanced machine learning functions using Amazon SageMaker Studio with OneData users and investigating the applicability of AWS Glue Data Quality to reduce the manual quality monitoring efforts. Furthermore, we are currently in the process of implementing Amazon DataZone through various initiatives and efforts, such as verifying its functionality and examining how it can operate while bridging the gap between OneData’s existing processes and to the ideal process we are aiming for ideals.

We have had the opportunity to discuss data utilization with various partners and customers and although individual challenges may differ in size and its context, the issues that we are currently trying to solve with OneData are common to many of them.

This post describes only a small portion of how Fujitsu tackled challenges using the AWS Cloud, but we hope the post will give you some inspiration to solve your own challenges.


About the Author


Kanehito Miyake is an engineer at Fujitsu Japan and in charge of OneData’s solution and cloud architecture. He spearheaded the architectural study of the OneData project and contributed greatly to promoting data utilization at Fujitsu with his expertise. He loves rockfish fishing.

Junpei Ozono is a Go-to-market Data & AI solutions architect at AWS in Japan. Junpei supports customers’ journeys on the AWS Cloud from Data & AI aspects and guides them to design and develop data-driven architectures powered by AWS services.

Optimize data layout by bucketing with Amazon Athena and AWS Glue to accelerate downstream queries

Post Syndicated from Takeshi Nakatani original https://aws.amazon.com/blogs/big-data/optimize-data-layout-by-bucketing-with-amazon-athena-and-aws-glue-to-accelerate-downstream-queries/

In the era of data, organizations are increasingly using data lakes to store and analyze vast amounts of structured and unstructured data. Data lakes provide a centralized repository for data from various sources, enabling organizations to unlock valuable insights and drive data-driven decision-making. However, as data volumes continue to grow, optimizing data layout and organization becomes crucial for efficient querying and analysis.

One of the key challenges in data lakes is the potential for slow query performance, especially when dealing with large datasets. This can be attributed to factors such as inefficient data layout, resulting in excessive data scanning and inefficient use of compute resources. To address this challenge, common practices like partitioning and bucketing can significantly improve query performance and reduce computation costs.

Partitioning is a technique that divides a large dataset into smaller, more manageable parts based on specific criteria, such as date, region, or product category. By partitioning data, downstream analytical queries can skip irrelevant partitions, reducing the amount of data that needs to be scanned and processed. You can use partition columns in the WHERE clause in queries to scan only the specific partitions that your query needs. This can lead to faster query runtimes and more efficient resource utilization. It especially works well when columns with low cardinality are chosen as the key.

What if you have a high cardinality column that you sometimes need to filter by VIP customers? Each customer is usually identified with an ID, which can be millions. Partitioning isn’t suitable for such high cardinality columns because you end up with small files, slow partition filtering, and high Amazon Simple Storage Service (Amazon S3) API cost (one S3 prefix is created per value of partition column). Although you can use partitioning with a natural key such as city or state to narrow down your dataset to some degree, it is still necessary to query across date-based partitions if your data is time series.

This is where bucketing comes into play. Bucketing makes sure that all rows with the same values of one or more columns end up in the same file. Instead of one file per value, like partitioning, a hash function is used to distribute values evenly across a fixed number of files. By organizing data this way, you can perform efficient filtering, because only the relevant buckets need to be processed, further reducing computational overhead.

There are multiple options for implementing bucketing on AWS. One approach is to use the Amazon Athena CREATE TABLE AS SELECT (CTAS) statement, which allows you to create a bucketed table directly from a query. Alternatively, you can use AWS Glue for Apache Spark, which provides built-in support for bucketing configurations during the data transformation process. AWS Glue allows you to define bucketing parameters, such as the number of buckets and the columns to bucket on, providing an optimized data layout for efficient querying with Athena.

In this post, we discuss how to implement bucketing on AWS data lakes, including using Athena CTAS statement and AWS Glue for Apache Spark. We also cover bucketing for Apache Iceberg tables.

Example use case

In this post, you use a public dataset, the NOAA Integrated Surface Database. Data analysts run one-time queries for data during the past 5 years through Athena. Most of the queries are for specific stations with specific report types. The queries need to complete in 10 seconds, and the cost needs to be optimized carefully. In this scenario, you’re a data engineer responsible for optimizing query performance and cost.

For example, if an analyst wants to retrieve data for a specific station (for example, station ID 123456) with a particular report type (for example, CRN01), the query might look like the following query:

SELECT station, report_type, columnA, columnB, ...
FROM table_name
WHERE
report_type = 'CRN01'
AND station = '123456'

In the case of the NOAA Integrated Surface Database, the station_id column is likely to have a high cardinality, with numerous unique station identifiers. On the other hand, the report_type column may have a relatively low cardinality, with a limited set of report types. Given this scenario, it would be a good idea to partition the data by report_type and bucket it by station_id.

With this partitioning and bucketing strategy, Athena can first eliminate partitions for irrelevant report types, and then scan only the buckets within the relevant partition that match the specified station ID, significantly reducing the amount of data processed and accelerating query runtimes. This approach not only meets the query performance requirement, but also helps optimize costs by minimizing the amount of data scanned and billed for each query.

In this post, we examine how query performance is affected by data layout, in particular, bucketing. We also compare three different ways to achieve bucketing. The following table represents conditions for the tables to be created.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Parquet
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes

noaa_remote_original is partitioned by the year column, but not by the report_type column. This row represents if the table is partitioned by the actual columns that are used in the queries.

Baseline table

For this post, you create several tables with different conditions: some without bucketing and some with bucketing, to showcase the performance characteristics of bucketing. First, let’s create an original table using the NOAA data. In subsequent steps, you ingest data from this table to create test tables.

There are multiple ways to define a table definition: running DDL, an AWS Glue crawler, the AWS Glue Data Catalog API, and so on. In this step, you run DDL via the Athena console.

Complete the following steps to create the "bucketing_blog"."noaa_remote_original" table in the Data Catalog:

  1. Open the Athena console.
  2. In the query editor, run the following DDL to create a new AWS Glue database:
    -- Create Glue database
    CREATE DATABASE bucketing_blog;

  3. For Database under Data, choose bucketing_blog to set the current database.
  4. Run the following DDL to create the original table:
    -- Create original table
    CREATE EXTERNAL TABLE `bucketing_blog`.`noaa_remote_original`(
      `station` STRING, 
      `date` STRING, 
      `source` STRING, 
      `latitude` STRING, 
      `longitude` STRING, 
      `elevation` STRING, 
      `name` STRING, 
      `report_type` STRING, 
      `call_sign` STRING, 
      `quality_control` STRING, 
      `wnd` STRING, 
      `cig` STRING, 
      `vis` STRING, 
      `tmp` STRING, 
      `dew` STRING, 
      `slp` STRING, 
      `aj1` STRING, 
      `gf1` STRING, 
      `mw1` STRING)
    PARTITIONED BY (
        year STRING)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
    WITH SERDEPROPERTIES ( 
      'escapeChar'='\\',
      'quoteChar'='\"',
      'separatorChar'=',') 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://noaa-global-hourly-pds/'
    TBLPROPERTIES (
      'skip.header.line.count'='1'
    )

Because the source data has quoted fields, we use OpenCSVSerde instead of the default LazySimpleSerde.

These CSV files have a header row, which we tell Athena to skip by adding skip.header.line.count and setting the value to 1.

For more details, refer to OpenCSVSerDe for processing CSV.

  1. Run the following DDL to add partitions. We add partitions only for 5 years out of 124 years based on the use case requirement:
    -- Load partitions
    ALTER TABLE `bucketing_blog`.`noaa_remote_original` ADD
      PARTITION (year = '2024') LOCATION 's3://noaa-global-hourly-pds/2024/'
      PARTITION (year = '2023') LOCATION 's3://noaa-global-hourly-pds/2023/'
      PARTITION (year = '2022') LOCATION 's3://noaa-global-hourly-pds/2022/'
      PARTITION (year = '2021') LOCATION 's3://noaa-global-hourly-pds/2021/'
      PARTITION (year = '2020') LOCATION 's3://noaa-global-hourly-pds/2020/';

  2. Run the following DML to verify if you can successfully query the data:
    -- Check data 
    SELECT * FROM "bucketing_blog"."noaa_remote_original" LIMIT 10;

Now you’re ready to start querying the original table to examine the baseline performance.

  1. Run a query against the original table to evaluate the query performance as a baseline. The following query selects records for five specific stations with report type CRN05:
    -- Baseline
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."noaa_remote_original"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );

We ran this query 10 times. The average query runtime for 10 queries is 27.6 seconds, which is far longer than our target of 10 seconds, and 155.75 GB data is scanned to return 1.65 million records. This is the baseline performance of the original raw table. It’s time to start optimizing data layout from this baseline.

Next, you create tables with different conditions from the original: one without bucketing and one with bucketing, and compare them.

Optimize data layout using Athena CTAS

In this section, we use an Athena CTAS query to optimize data layout and its format.

First, let’s create a table with partitioning but without bucketing. The new table is partitioned by the column report_type because most of expected queries use this column in the WHERE clause, and objects are stored as Parquet with Snappy compression.

  1. Open the Athena query editor.
  2. Run the following query, providing your own S3 bucket and prefix:
    --CTAS, non-bucketed
    CREATE TABLE "bucketing_blog"."athena_non_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-non-bucketed/',
        partitioned_by = ARRAY['report_type'],
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshots.


There are 30 files under the partition.

Next, you create a table with Hive style bucketing. The number of buckets needs to be carefully tuned through experiments for your own use case. Generally speaking, the more buckets you have, the smaller the granularity, which might result in better performance. On the other hand, too many small files may introduce inefficiency in query planning and processing. Also, bucketing only works if you are querying a few values of the bucketing key. The more values you add to your query, the more likely that you will end up reading all buckets.

The following is the baseline query to optimize:

-- Baseline
SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
FROM "bucketing_blog"."noaa_remote_original"
WHERE
    report_type = 'CRN05'
    AND ( station = '99999904237'
        OR station = '99999953132'
        OR station = '99999903061'
        OR station = '99999963856'
        OR station = '99999994644'
    );

In this example, the table is going to be bucketed into 16 buckets by a high-cardinality column (station), which is supposed to be used for the WHERE clause in the query. All other conditions remain the same. The baseline query has five values in the station ID, and you expect queries to have around that number at most, which is less enough than the number of buckets, so 16 should work well. It is possible to specify a larger number of buckets, but CTAS can’t be used if the total number of partitions exceeds 100.

  1. Run the following query:
    -- CTAS, Hive-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-bucketed/',
        partitioned_by = ARRAY['report_type'],
        bucketed_by = ARRAY['station'],
        bucket_count = 16,
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

The query creates S3 objects organized as shown in the following screenshots.


The table-level layout looks exactly the same between athena_non_bucketed and athena_bucketed: there are 13 partitions in each table. The difference is the number of objects under the partitions. There are 16 objects (buckets) per partition, of roughly 10–25 MB each in this case. The number of buckets is constant at the specified value regardless of the amount of data, but the bucket size depends on the amount of data.

Now you’re ready to query against each table to evaluate query performance. The query will select records with five specific stations and report type CRN05 for the past 5 years. Although you can’t see which data of a specific station is located in which bucket, it has been calculated and located correctly by Athena.

  1. Query the non-bucketed table with the following statement:
    -- No bucketing 
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_non_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 10.95 seconds, and 358 MB of data is scanned to return 2.21 million records. Both the runtime and scan size have been significantly decreased because you’ve partitioned the data, and can now read only one partition where 12 partitions of 13 are skipped. In addition, the amount of data scanned has gone down from 206 GB to 360 MB, which is a reduction of 99.8%. This is not just due to the partitioning, but also due to the change of its format to Parquet and compression with Snappy.

  1. Query the bucketed table with the following statement:
    -- Hive bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 7.82 seconds, and 69 MB of data is scanned to return 2.21 million records. This means a reduction of average runtime from 10.95 to 7.82 seconds (-29%), and a dramatic reduction of data scanned from 358 MB to 69 MB (-81%) to return the same number of records compared with the non-bucketed table. In this case, both runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

Considerations

As stated earlier, size your bucket carefully to maximize performance of your query. Bucketing only works if you are querying a few values of the bucketing key. Consider creating more buckets than the number of values expected in the actual query.

Additionally, an Athena CTAS query is limited to create up to 100 partitions at one time. If you need a large number of partitions, you may want to use AWS Glue extract, transform, and load (ETL), although there is a workaround to split into multiple SQL statements.

Optimize data layout using AWS Glue ETL

Apache Spark is an open source distributed processing framework that enables flexible ETL with PySpark, Scala, and Spark SQL. It allows you to partition and bucket your data based on your requirements. Spark has several tuning options to accelerate jobs. You can effortlessly automate and monitor Spark jobs. In this section, we use AWS Glue ETL jobs to run Spark code to optimize data layout.

Unlike Athena bucketing, AWS Glue ETL uses Spark-based bucketing as a bucketing algorithm. All you need to do is add the following table property onto the table: bucketing_format = 'spark'. For details about this table property, see Partitioning and bucketing in Athena.

Complete the following steps to create a table with bucketing through AWS Glue ETL:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Create job and choose Visual ETL.
  3. Under Add nodes, choose AWS Glue Data Catalog for Sources.
  4. For Database, choose bucketing_blog.
  5. For Table, choose noaa_remote_original.
  6. Under Add nodes, choose Change Schema for Transforms.
  7. Under Add nodes, choose Custom Transform for Transforms.
  8. For Name, enter ToS3WithBucketing.
  9. For Node parents, choose Change Schema.
  10. For Code block, enter the following code snippet:
    def ToS3WithBucketing (glueContext, dfc) -> DynamicFrameCollection:
        # Convert DynamicFrame to DataFrame
        df = dfc.select(list(dfc.keys())[0]).toDF()
        
        # Write to S3 with bucketing and partitioning
        df.repartition(1, "report_type") \
            .write.option("path", "s3://<your-s3-location>/glue-bucketed/") \
            .mode("overwrite") \
            .partitionBy("report_type") \
            .bucketBy(16, "station") \
            .format("parquet") \
            .option("compression", "snappy") \
            .saveAsTable("bucketing_blog.glue_bucketed")

The following screenshot shows the job created using AWS Glue Studio to generate a table and data.

Each node represents the following:

  • The AWS Glue Data Catalog node loads the noaa_remote_original table from the Data Catalog
  • The Change Schema node makes sure that it loads columns registered in the Data Catalog
  • The ToS3WithBucketing node writes data to Amazon S3 with both partitioning and Spark-based bucketing

The job has been successfully authored in the visual editor.

  1. Under Job details, for IAM Role, choose your AWS Identity and Access Management (IAM) role for this job.
  2. For Worker type, choose G.8X.
  3. For Requested number of workers, enter 5.
  4. Choose Save, then choose Run.

After these steps, the table glue_bucketed. has been created.

  1. Choose Tables in the navigation pane, and choose the table glue_bucketed.
  2. On the Actions menu, choose Edit table under Manage.
  3. In the Table properties section, choose Add.
  4. Add a key pair with key bucketing_format and value spark.
  5. Choose Save.

Now it’s time to query the tables.

  1. Query the bucketed table with the following statement:
    -- Spark bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."glue_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran the query 10 times. The average runtime of the 10 queries is 7.09 seconds, and 88 MB of data is scanned to return 2.21 million records. In this case, both the runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

The reason for the larger bytes scanned compared to the Athena CTAS example is that the values were distributed differently in this table. In the AWS Glue bucketed table, the values were distributed over five files. In the Athena CTAS bucketed table, the values were distributed over four files. Remember that rows are distributed into buckets using a hash function. The Spark bucketing algorithm uses a different hash function than Hive, and in this case, it resulted in a different distribution across the files.

Considerations

Glue DynamicFrame does not support bucketing natively. You need to use Spark DataFrame instead of DynamicFrame to bucket tables.

For information about fine-tuning AWS Glue ETL performance, refer to Best practices for performance tuning AWS Glue for Apache Spark jobs.

Optimize Iceberg data layout with hidden partitioning

Apache Iceberg is a high-performance open table format for huge analytic tables, bringing the reliability and simplicity of SQL tables to big data. Recently, there has been a huge demand to use Apache Iceberg tables to achieve advanced capabilities like ACID transaction, time travel query, and more.

In Iceberg, bucketing works differently than the Hive table method we’ve seen so far. In Iceberg, bucketing is a subset of partitioning, and can be applied using the bucket partition transform. The way you use it and the end result is similar to bucketing in Hive tables. For more details about Iceberg bucket transforms, refer to Bucket Transform Details.

Complete the following steps:

  1. Open the Athena query editor.
  2. Run the following query to create an Iceberg table with hidden partitioning along with bucketing:
    -- CTAS, Iceberg-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed_iceberg"
    WITH (table_type = 'ICEBERG',
          location = 's3://<your-s3-location>/athena-bucketed-iceberg/', 
          is_external = false,
          partitioning = ARRAY['report_type', 'bucket(station, 16)'],
          format = 'PARQUET',
          write_compression = 'SNAPPY'
    ) 
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshot.

There are two folders: data and metadata. Drill down to data.

You see random prefixes under the data folder. Choose the first one to view its details.

You see the top-level partition based on the report_type column. Drill down to the next level.

You see the second-level partition, bucketed with the station column.

The Parquet data files exist under these folders.

  1. Query the bucketed table with the following statement:
    -- Iceberg bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed_iceberg"
    WHERE
        report_type = 'CRN05'
        AND
        ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


With the Iceberg-bucketed table, the average runtime of the 10 queries is 8.03 seconds, and 148 MB of data is scanned to return 2.21 million records. This is less efficient than bucketing with AWS Glue or Athena, but considering the benefits of Iceberg’s various features, it is within an acceptable range.

Results

The following table summarizes all the results.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Iceberg (Parquet)
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Table size (GB) 155.8 5.0 5.0 5.8 5.0
The number of S3 Objects 53360 376 192 192 195
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes
Bucketing format n/a n/a Hive Spark Iceberg
Number of buckets n/a n/a 16 16 16
Average runtime (sec) 29.178 10.950 7.815 7.089 8.030
Scanned size (MB) 206640.0 358.6 69.1 87.8 147.7

With athena_bucketed, glue_bucketed, and athena_bucketed_iceberg, you were able to meet the latency goal of 10 seconds. With bucketing, you saw a 25–40% reduction in runtime and a 60–85% reduction in scan size, which can contribute to both latency and cost optimization.

As you can see from the result, although partitioning contributes significantly to reduce both runtime and scan size, bucketing can also contribute to reduce them further.

Athena CTAS is straightforward and fast enough to complete the bucketing process. AWS Glue ETL is more flexible and scalable to achieve advanced use cases. You can choose either method based on your requirement and use case, because you can take advantage of bucketing through either option.

Conclusion

In this post, we demonstrated how to optimize your table data layout with partitioning and bucketing through Athena CTAS and AWS Glue ETL. We showed that bucketing contributes to accelerating query latency and reducing scan size to further optimize costs. We also discussed bucketing for Iceberg tables through hidden partitioning.

Bucketing just one technique to optimize data layout by reducing data scan. For optimizing your entire data layout, we recommend considering other options like partitioning, using columnar file format, and compression in conjunction with bucketing. This can enable your data to further enhance query performance.

Happy bucketing!


About the Authors

Takeshi Nakatani is a Principal Big Data Consultant on the Professional Services team in Tokyo. He has 26 years of experience in the IT industry, with expertise in architecting data infrastructure. On his days off, he can be a rock drummer or a motorcyclist.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Multicloud data lake analytics with Amazon Athena

Post Syndicated from Shoukat Ghouse original https://aws.amazon.com/blogs/big-data/multicloud-data-lake-analytics-with-amazon-athena/

Many organizations operate data lakes spanning multiple cloud data stores. This could be for various reasons, such as business expansions, mergers, or specific cloud provider preferences for different business units. In these cases, you may want an integrated query layer to seamlessly run analytical queries across these diverse cloud stores and streamline your data analytics processes. With a unified query interface, you can avoid the complexity of managing multiple query tools and gain a holistic view of your data assets regardless of where the data assets reside. You can consolidate your analytics workflows, reducing the need for extensive tooling and infrastructure management. This consolidation not only saves time and resources but also enables teams to focus more on deriving insights from data rather than navigating through various query tools and interfaces. A unified query interface promotes a holistic view of data assets by breaking down silos and facilitating seamless access to data stored across different cloud data stores. This comprehensive view enhances decision-making capabilities by empowering stakeholders to analyze data from multiple sources in a unified manner, leading to more informed strategic decisions.

In this post, we delve into the ways in which you can use Amazon Athena connectors to efficiently query data files residing across Azure Data Lake Storage (ADLS) Gen2, Google Cloud Storage (GCS), and Amazon Simple Storage Service (Amazon S3). Additionally, we explore the use of Athena workgroups and cost allocation tags to effectively categorize and analyze the costs associated with running analytical queries.

Solution overview

Imagine a fictional company named Oktank, which manages its data across data lakes on Amazon S3, ADLS, and GCS. Oktank wants to be able to query any of their cloud data stores and run analytical queries like joins and aggregations across the data stores without needing to transfer data to an S3 data lake. Oktank also wants to identify and analyze the costs associated with running analytics queries. To achieve this, Oktank envisions a unified data query layer using Athena.

The following diagram illustrates the high-level solution architecture.

Users run their queries from Athena connecting to specific Athena workgroups. Athena uses connectors to federate the queries across multiple data sources. In this case, we use the Amazon Athena Azure Synapse connector to query data from ADLS Gen2 via Synapse and the Amazon Athena GCS connector for GCS. An Athena connector is an extension of the Athena query engine. When a query runs on a federated data source using a connector, Athena invokes multiple AWS Lambda functions to read from the data sources in parallel to optimize performance. Refer to Using Amazon Athena Federated Query for further details. The AWS Glue Data Catalog holds the metadata for Amazon S3 and GCS data.

In the following sections, we demonstrate how to build this architecture.

Prerequisites

Before you configure your resources on AWS, you need to set up the necessary infrastructure required for this post in both Azure and GCP. The detailed steps and guidelines for creating the resources in Azure and GCP are beyond the scope of this post. Refer to the respective documentation for details. In this section, we provide some basic steps needed to create the resources required for the post.

You can download the sample data file cust_feedback_v0.csv.

Configure the dataset for Azure

To set up the sample dataset for Azure, log in to the Azure portal and upload the file to ADLS Gen2. The following screenshot shows the file under the container blog-container under a specific storage account on ADLS Gen2.

Set up a Synapse workspace in Azure and create an external table in Synapse that points to the relevant location. The following commands offer a foundational guide for running the necessary actions within the Synapse workspace to create the essential resources for this post. Refer to the corresponding Synapse documentation for additional details as required.

# Create Database
CREATE DATABASE azure_athena_blog_db
# Create file format
CREATE EXTERNAL FILE FORMAT [SynapseDelimitedTextFormat]
WITH ( FORMAT_TYPE = DELIMITEDTEXT ,
FORMAT_OPTIONS (
FIELD_TERMINATOR = ',',
USE_TYPE_DEFAULT = FALSE,
FIRST_ROW = 2
))

# Create key
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '*******;

# Create Database credential
CREATE DATABASE SCOPED CREDENTIAL dbscopedCreds
WITH IDENTITY = 'Managed Identity';

# Create Data Source
CREATE EXTERNAL DATA SOURCE athena_blog_datasource
WITH ( LOCATION = 'abfss://[email protected]/',
CREDENTIAL = dbscopedCreds
)

# Create External Table
CREATE EXTERNAL TABLE dbo.customer_feedbacks_azure (
[data_key] nvarchar(4000),
[data_load_date] nvarchar(4000),
[data_location] nvarchar(4000),
[product_id] nvarchar(4000),
[customer_email] nvarchar(4000),
[customer_name] nvarchar(4000),
[comment1] nvarchar(4000),
[comment2] nvarchar(4000)
)
WITH (
LOCATION = 'cust_feedback_v0.csv',
DATA_SOURCE = athena_blog_datasource,
FILE_FORMAT = [SynapseDelimitedTextFormat]
);

# Create User
CREATE LOGIN bloguser1 WITH PASSWORD = '****';
CREATE USER bloguser1 FROM LOGIN bloguser1;

# Grant select on the Schema
GRANT SELECT ON SCHEMA::dbo TO [bloguser1];

Note down the user name, password, database name, and the serverless or dedicated SQL endpoint you use—you need these in the subsequent steps.

This completes the setup on Azure for the sample dataset.

Configure the dataset for GCS

To set up the sample dataset for GCS, upload the file to the GCS bucket.

Create a GCP service account and grant access to the bucket.

In addition, create a JSON key for the service account. The content of the key is needed in subsequent steps.

This completes the setup on GCP for our sample dataset.

Deploy the AWS infrastructure

You can now run the provided AWS CloudFormation stack to create the solution resources. Identify an AWS Region in which you want to create the resources and ensure you use the same Region throughout the setup and verifications.

Refer to the following table for the necessary parameters that you must provide. You can leave other parameters at their default values or modify them according to your requirement.

Parameter Name Expected Value
AzureSynapseUserName User name for the Synapse database you created.
AzureSynapsePwd Password for the Synapse database user.
AzureSynapseURL

Synapse JDBC URL, in the following format: jdbc:sqlserver://<sqlendpoint>;databaseName=<databasename>

For example: jdbc:sqlserver://xxxxg-ondemand.sql.azuresynapse.net;databaseName=azure_athena_blog_db

GCSSecretKey Content from the secret key file from GCP.
UserAzureADLSOnlyUserPassword AWS Management Console password for the Azure-only user. This user can only query data from ADLS.
UserGCSOnlyUserPassword AWS Management Console password for the GCS-only user. This user can only query data from GCP GCS.
UserMultiCloudUserPassword AWS Management Console password for the multi-cloud user. This user can query data from any of the cloud stores.

The stack provisions the VPC, subnets, S3 buckets, Athena workgroups, and AWS Glue database and tables. It creates two secrets in AWS Secrets Manager to store the GCS secret key and the Synapse user name and password. You use these secrets when creating the Athena connectors.

The stack also creates three AWS Identity and Access Management (IAM) users and grants permissions on corresponding Athena workgroups, Athena data sources, and Lambda functions: AzureADLSUser, which can run queries on ADLS and Amazon S3, GCPGCSUser, which can query GCS and Amazon S3, and MultiCloudUser, which can query Amazon S3, Azure ADLS Gen2 and GCS data sources. The stack does not create the Athena data source and Lambda functions. You create these in subsequent steps when you create the Athena connectors.

The stack also attaches cost allocation tags to the Athena workgroups, the secrets in Secrets Manager, and the S3 buckets. You use these tags for cost analysis in subsequent steps.

When the stack deployment is complete, note the values of the CloudFormation stack outputs, which you use in subsequent steps.

Upload the data file to the S3 bucket created by the CloudFormation stack. You can retrieve the bucket name from the value of the key named S3SourceBucket from the stack output. This serves as the S3 data lake data for this post.

You can now create the connectors.

Create the Athena Synapse connector

To set up the Azure Synapse connector, complete the following steps:

  1. On the Lambda console, create a new application.
  2. In the Application settings section, enter the values for the corresponding key from the output of the CloudFormation stack, as listed in the following table.
Property Name CloudFormation Output Key
SecretNamePrefix AzureSecretName
DefaultConnectionString AzureSynapseConnectorJDBCURL
LambdaFunctionName AzureADLSLambdaFunctionName
SecurityGroupIds SecurityGroupId
SpillBucket AthenaLocationAzure
SubnetIds PrivateSubnetId

  1. Select the Acknowledgement check box and choose Deploy.

Wait for the application to be deployed before proceeding to the next step.

Create the Athena GCS connector

To create the Athena GCS connector, complete the following steps:

  1. On the Lambda console, create a new application.
  2. In the Application settings section, enter the values for the corresponding key from the output of the CloudFormation stack, as listed in the following table.
Property Name CloudFormation Output Key
SpillBucket AthenaLocationGCP
GCSSecretName GCSSecretName
LambdaFunctionName GCSLambdaFunctionName
  1. Select the Acknowledgement check box and choose Deploy.

For the GCS connector, there are some post-deployment steps to create the AWS Glue database and table for the GCS data file. In this post, the CloudFormation stack you deployed earlier already created these resources, so you don’t have to create it. The stack created an AWS Glue database called oktank_multicloudanalytics_gcp and a table called customer_feedbacks under the database with the required configurations.

Log in to the Lambda console to verify the Lambda functions were created.

Next, you create the Athena data sources corresponding to these connectors.

Create the Azure data source

Complete the following steps to create your Azure data source:

  1. On the Athena console, create a new data source.
  2. For Data sources, select Microsoft Azure Synapse.
  3. Choose Next.

  1. For Data source name, enter the value for the AthenaFederatedDataSourceNameForAzure key from the CloudFormation stack output.
  2. In the Connection details section, choose Lambda function you created earlier for Azure.

  1. Choose Next, then choose Create data source.

You should be able to see the associated schemas for the Azure external database.

Create the GCS data source

Complete the following steps to create your Azure data source:

  1. On the Athena console, create a new data source.
  2. For Data sources, select Google Cloud Storage.
  3. Choose Next.

  1. For Data source name, enter the value for the AthenaFederatedDataSourceNameForGCS key from the CloudFormation stack output.
  2. In the Connection details section, choose Lambda function you created earlier for GCS.

  1. Choose Next, then choose Create data source.

This completes the deployment. You can now run the multi-cloud queries from Athena.

Query the federated data sources

In this section, we demonstrate how to query the data sources using the ADLS user, GCS user, and multi-cloud user.

Run queries as the ADLS user

The ADLS user can run multi-cloud queries on ADLS Gen2 and Amazon S3 data. Complete the following steps:

  1. Get the value for UserAzureADLSUser from the CloudFormation stack output.

  1. Sign in to the Athena query editor with this user.
  2. Switch the workgroup to athena-mc-analytics-azure-wg in the Athena query editor.

  1. Choose Acknowledge to accept the workgroup settings.

  1. Run the following query to join the S3 data lake table to the ADLS data lake table:
SELECT a.data_load_date as azure_load_date, b.data_key as s3_data_key, a.data_location as azure_data_location FROM "azure_adls_ds"."dbo"."customer_feedbacks_azure" a join "AwsDataCatalog"."oktank_multicloudanalytics_aws"."customer_feedbacks" b ON cast(a.data_key as integer) = b.data_key

Run queries as the GCS user

The GCS user can run multi-cloud queries on GCS and Amazon S3 data. Complete the following steps:

  1. Get the value for UserGCPGCSUser from the CloudFormation stack output.
  2. Sign in to the Athena query editor with this user.
  3. Switch the workgroup to athena-mc-analytics-gcp-wg in the Athena query editor.
  4. Choose Acknowledge to accept the workgroup settings.
  5. Run the following query to join the S3 data lake table to the GCS data lake table:
SELECT a.data_load_date as gcs_load_date, b.data_key as s3_data_key, a.data_location as gcs_data_location FROM "gcp_gcs_ds"."oktank_multicloudanalytics_gcp"."customer_feedbacks" a
join "AwsDataCatalog"."oktank_multicloudanalytics_aws"."customer_feedbacks" b 
ON a.data_key = b.data_key

Run queries as the multi-cloud user

The multi-cloud user can run queries that can access data from any cloud store. Complete the following steps:

  1. Get the value for UserMultiCloudUser from the CloudFormation stack output.
  2. Sign in to the Athena query editor with this user.
  3. Switch the workgroup to athena-mc-analytics-multi-wg in the Athena query editor.
  4. Choose Acknowledge to accept the workgroup settings.
  5. Run the following query to join data across the multiple cloud stores:
SELECT a.data_load_date as adls_load_date, b.data_key as s3_data_key, c.data_location as gcs_data_location 
FROM "azure_adls_ds"."dbo"."CUSTOMER_FEEDBACKS_AZURE" a 
join "AwsDataCatalog"."oktank_multicloudanalytics_aws"."customer_feedbacks" b 
on cast(a.data_key as integer) = b.data_key join "gcp_gcs_ds"."oktank_multicloudanalytics_gcp"."customer_feedbacks" c 
on b.data_key = c.data_key

Cost analysis with cost allocation tags

When you run multi-cloud queries, you need to carefully consider the data transfer costs associated with each cloud provider. Refer to the corresponding cloud documentation for details. The cost reports highlighted in this section refer to the AWS infrastructure and service usage costs. The storage and other associated costs with ADLS, Synapse, and GCS are not included.

Let’s see how to handle cost analysis for the multiple scenarios we have discussed.

The CloudFormation stack you deployed earlier added user-defined cost allocation tags, as shown in the following screenshot.

Sign in to AWS Billing and Cost Management console and enable these cost allocation tags. It may take up to 24 hours for the cost allocation tags to be available and reflected in AWS Cost Explorer.

To track the cost of the Lambda functions deployed as part of the GCS and Synapse connectors, you can use the AWS generated cost allocation tags, as shown in the following screenshot.

You can use these tags on the Billing and Cost Management console to determine the cost per tag. We provide some sample screenshots for reference. These reports only show the cost of AWS resources used to access ADLS Gen2 or GCP GCS. The reports do not show the cost of GCP or Azure resources.

Athena costs

To view Athena costs, choose the tag athena-mc-analytics:athena:workgroup and filter the tags values azure, gcp, and multi.

You can also use workgroups to set limits on the amount of data each workgroup can process to track and control cost. For more information, refer to Using workgroups to control query access and costs and Separate queries and managing costs using Amazon Athena workgroups.

Amazon S3 costs

To view the costs for Amazon S3 storage (Athena query results and spill storage), choose the tag athena-mc-analytics:s3:result-spill and filter the tag values azure, gcp, and multi.

Lambda costs

To view the costs for the Lambda functions, choose the tag aws:cloudformation:stack-name and filter the tag values serverlessepo-AthenaSynapseConnector and serverlessepo-AthenaGCSConnector.

Cost allocation tags help manage and track costs effectively when you’re running multi-cloud queries. This can help you track, control, and optimize your spending while taking advantage of the benefits of multi-cloud data analytics.

Clean up

To avoid incurring further charges, delete the CloudFormation stacks to delete the resources you provisioned as part of this post. There are two additional stacks deployed for each connector: serverlessrepo-AthenaGCSConnector and serverlessrepo-AthenaSynapseConnector. Delete all three stacks.

Conclusion

In this post, we discussed a comprehensive solution for organizations looking to implement multi-cloud data lake analytics using Athena, enabling a consolidated view of data across diverse cloud data stores and enhancing decision-making capabilities. We focused on querying data lakes across Amazon S3, Azure Data Lake Storage Gen2, and Google Cloud Storage using Athena. We demonstrated how to set up resources on Azure, GCP, and AWS, including creating databases, tables, Lambda functions, and Athena data sources. We also provided instructions for querying federated data sources from Athena, demonstrating how you can run multi-cloud queries tailored to your specific needs. Lastly, we discussed cost analysis using AWS cost allocation tags.

For further reading, refer to the following resources:

About the Author

Shoukat Ghouse is a Senior Big Data Specialist Solutions Architect at AWS. He helps customers around the world build robust, efficient and scalable data platforms on AWS leveraging AWS analytics services like AWS Glue, AWS Lake Formation, Amazon Athena and Amazon EMR.

Gain insights from historical location data using Amazon Location Service and AWS analytics services

Post Syndicated from Alan Peaty original https://aws.amazon.com/blogs/big-data/gain-insights-from-historical-location-data-using-amazon-location-service-and-aws-analytics-services/

Many organizations around the world rely on the use of physical assets, such as vehicles, to deliver a service to their end-customers. By tracking these assets in real time and storing the results, asset owners can derive valuable insights on how their assets are being used to continuously deliver business improvements and plan for future changes. For example, a delivery company operating a fleet of vehicles may need to ascertain the impact from local policy changes outside of their control, such as the announced expansion of an Ultra-Low Emission Zone (ULEZ). By combining historical vehicle location data with information from other sources, the company can devise empirical approaches for better decision-making. For example, the company’s procurement team can use this information to make decisions about which vehicles to prioritize for replacement before policy changes go into effect.

Developers can use the support in Amazon Location Service for publishing device position updates to Amazon EventBridge to build a near-real-time data pipeline that stores locations of tracked assets in Amazon Simple Storage Service (Amazon S3). Additionally, you can use AWS Lambda to enrich incoming location data with data from other sources, such as an Amazon DynamoDB table containing vehicle maintenance details. Then a data analyst can use the geospatial querying capabilities of Amazon Athena to gain insights, such as the number of days their vehicles have operated in the proposed boundaries of an expanded ULEZ. Because vehicles that do not meet ULEZ emissions standards are subjected to a daily charge to operate within the zone, you can use the location data, along with maintenance data such as age of the vehicle, current mileage, and current emissions standards to estimate the amount the company would have to spend on daily fees.

This post shows how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use this data to drive meaningful insights using AWS Glue and Athena.

Overview of solution

This is a fully serverless solution for location-based asset management. The solution consists of the following interfaces:

  • IoT or mobile application – A mobile application or an Internet of Things (IoT) device allows the tracking of a company vehicle while it is in use and transmits its current location securely to the data ingestion layer in AWS. The ingestion approach is not in scope of this post. Instead, a Lambda function in our solution simulates sample vehicle journeys and directly updates Amazon Location tracker objects with randomized locations.
  • Data analytics – Business analysts gather operational insights from multiple data sources, including the location data collected from the vehicles. Data analysts are looking for answers to questions such as, “How long did a given vehicle historically spend inside a proposed zone, and how much would the fees have cost had the policy been in place over the past 12 months?”

The following diagram illustrates the solution architecture.
Architecture diagram

The workflow consists of the following key steps:

  1. The tracking functionality of Amazon Location is used to track the vehicle. Using EventBridge integration, filtered positional updates are published to an EventBridge event bus. This solution uses distance-based filtering to reduce costs and jitter. Distanced-based filtering ignores location updates in which devices have moved less than 30 meters (98.4 feet).
  2. Amazon Location device position events arrive on the EventBridge default bus with source: ["aws.geo"] and detail-type: ["Location Device Position Event"]. One rule is created to forward these events to two downstream targets: a Lambda function, and a Firehose delivery stream.
  3. Two different patterns, based on each target, are described in this post to demonstrate different approaches to committing the data to a S3 bucket:
    1. Lambda function – The first approach uses a Lambda function to demonstrate how you can use code in the data pipeline to directly transform the incoming location data. You can modify the Lambda function to fetch additional vehicle information from a separate data store (for example, a DynamoDB table or a Customer Relationship Management system) to enrich the data, before storing the results in an S3 bucket. In this model, the Lambda function is invoked for each incoming event.
    2. Firehose delivery stream – The second approach uses a Firehose delivery stream to buffer and batch the incoming positional updates, before storing them in an S3 bucket without modification. This method uses GZIP compression to optimize storage consumption and query performance. You can also use the data transformation feature of Data Firehose to invoke a Lambda function to perform data transformation in batches.
  4. AWS Glue crawls both S3 bucket paths, populates the AWS Glue database tables based on the inferred schemas, and makes the data available to other analytics applications through the AWS Glue Data Catalog.
  5. Athena is used to run geospatial queries on the location data stored in the S3 buckets. The Data Catalog provides metadata that allows analytics applications using Athena to find, read, and process the location data stored in Amazon S3.
  6. This solution includes a Lambda function that continuously updates the Amazon Location tracker with simulated location data from fictitious journeys. The Lambda function is triggered at regular intervals using a scheduled EventBridge rule.

You can test this solution yourself using the AWS Samples GitHub repository. The repository contains the AWS Serverless Application Model (AWS SAM) template and Lambda code required to try out this solution. Refer to the instructions in the README file for steps on how to provision and decommission this solution.

Visual layouts in some screenshots in this post may look different than those on your AWS Management Console.

Data generation

In this section, we discuss the steps to manually or automatically generate journey data.

Manually generate journey data

You can manually update device positions using the AWS Command Line Interface (AWS CLI) command aws location batch-update-device-position. Replace the tracker-name, device-id, Position, and SampleTime values with your own, and make sure that successive updates are more than 30 meters in distance apart to place an event on the default EventBridge event bus:

aws location batch-update-device-position --tracker-name <tracker-name> --updates "[{\"DeviceId\": \"<device-id>\", \"Position\": [<longitude>, <latitude>], \"SampleTime\": \"<YYYY-MM-DDThh:mm:ssZ>\"}]"

Automatically generate journey data using the simulator

The provided AWS CloudFormation template deploys an EventBridge scheduled rule and an accompanying Lambda function that simulates tracker updates from vehicles. This rule is enabled by default, and runs at a frequency specified by the SimulationIntervalMinutes CloudFormation parameter. The data generation Lambda function updates the Amazon Location tracker with a randomized position offset from the vehicles’ base locations.

Vehicle names and base locations are stored in the vehicles.json file. A vehicle’s starting position is reset each day, and base locations have been chosen to give them the ability to drift in and out of the ULEZ on a given day to provide a realistic journey simulation.

You can disable the rule temporarily by navigating to the scheduled rule details on the EventBridge console. Alternatively, change the parameter State: ENABLED to State: DISABLED for the scheduled rule resource GenerateDevicePositionsScheduleRule in the template.yml file. Rebuild and re-deploy the AWS SAM template for this change to take effect.

Location data pipeline approaches

The configurations outlined in this section are deployed automatically by the provided AWS SAM template. The information in this section is provided to describe the pertinent parts of the solution.

Amazon Location device position events

Amazon Location sends device position update events to EventBridge in the following format:

{
    "version":"0",
    "id":"<event-id>",
    "detail-type":"Location Device Position Event",
    "source":"aws.geo",
    "account":"<account-number>",
    "time":"<YYYY-MM-DDThh:mm:ssZ>",
    "region":"<region>",
    "resources":[
        "arn:aws:geo:<region>:<account-number>:tracker/<tracker-name>"
    ],
    "detail":{
        "EventType":"UPDATE",
        "TrackerName":"<tracker-name>",
        "DeviceId":"<device-id>",
        "SampleTime":"<YYYY-MM-DDThh:mm:ssZ>",
        "ReceivedTime":"<YYYY-MM-DDThh:mm:ss.sssZ>",
        "Position":[
            <longitude>, 
            <latitude>
	]
    }
}

You can optionally specify an input transformation to modify the format and contents of the device position event data before it reaches the target.

Data enrichment using Lambda

Data enrichment in this pattern is facilitated through the invocation of a Lambda function. In this example, we call this function ProcessDevicePosition, and use a Python runtime. A custom transformation is applied in the EventBridge target definition to receive the event data in the following format:

{
    "EventType":<EventType>,
    "TrackerName":<TrackerName>,
    "DeviceId":<DeviceId>,
    "SampleTime":<SampleTime>,
    "ReceivedTime":<ReceivedTime>,
    "Position":[<Longitude>,<Latitude>]
}

You could apply additional transformations, such as the refactoring of Latitude and Longitude data into separate key-value pairs if this is required by the downstream business logic processing the events.

The following code demonstrates the Python application logic that is run by the ProcessDevicePosition Lambda function. Error handling has been skipped in this code snippet for brevity. The full code is available in the GitHub repo.

import json
import os
import uuid
import boto3

# Import environment variables from Lambda function.
bucket_name = os.environ["S3_BUCKET_NAME"]
bucket_prefix = os.environ["S3_BUCKET_LAMBDA_PREFIX"]

s3 = boto3.client("s3")

def lambda_handler(event, context):
    key = "%s/%s/%s-%s.json" % (bucket_prefix,
                                event["DeviceId"],
                                event["SampleTime"],
                                str(uuid.uuid4())
    body = json.dumps(event, separators=(",", ":"))
    body_encoded = body.encode("utf-8")
    s3.put_object(Bucket=bucket_name, Key=key, Body=body_encoded)
    return {
        "statusCode": 200,
        "body": "success"
    }

The preceding code creates an S3 object for each device position event received by EventBridge. The code uses the DeviceId as a prefix to write the objects to the bucket.

You can add additional logic to the preceding Lambda function code to enrich the event data using other sources. The example in the GitHub repo demonstrates enriching the event with data from a DynamoDB vehicle maintenance table.

In addition to the prerequisite AWS Identity and Access Management (IAM) permissions provided by the role AWSBasicLambdaExecutionRole, the ProcessDevicePosition function requires permissions to perform the S3 put_object action and any other actions required by the data enrichment logic. IAM permissions required by the solution are documented in the template.yml file.

{
    "Version":"2012-10-17",
    "Statement":[
        {
            "Action":[
                "s3:ListBucket"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>"
            ],
            "Effect":"Allow"
        },
        {
            "Action":[
                "s3:PutObject"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>/<S3_BUCKET_LAMBDA_PREFIX>/*"
            ],
            "Effect":"Allow"
        }
    ]
}

Data pipeline using Amazon Data Firehose

Complete the following steps to create your Firehose delivery stream:

  1. On the Amazon Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose as Direct PUT.
  4. For Destination, choose Amazon S3.
  5. For Firehose stream name, enter a name (for this post, ProcessDevicePositionFirehose).
    Create Firehose stream
  6. Configure the destination settings with details about the S3 bucket in which the location data is stored, along with the partitioning strategy:
    1. Use <S3_BUCKET_NAME> and <S3_BUCKET_FIREHOSE_PREFIX> to determine the bucket and object prefixes.
    2. Use DeviceId as an additional prefix to write the objects to the bucket.
  7. Enable Dynamic partitioning and New line delimiter to make sure partitioning is automatic based on DeviceId, and that new line delimiters are added between records in objects that are delivered to Amazon S3.

These are required by AWS Glue to later crawl the data, and for Athena to recognize individual records.
Destination settings for Firehose stream

Create an EventBridge rule and attach targets

The EventBridge rule ProcessDevicePosition defines two targets: the ProcessDevicePosition Lambda function, and the ProcessDevicePositionFirehose delivery stream. Complete the following steps to create the rule and attach targets:

  1. On the EventBridge console, create a new rule.
  2. For Name, enter a name (for this post, ProcessDevicePosition).
  3. For Event bus¸ choose default.
  4. For Rule type¸ select Rule with an event pattern.
    EventBridge rule detail
  5. For Event source, select AWS events or EventBridge partner events.
    EventBridge event source
  6. For Method, select Use pattern form.
  7. In the Event pattern section, specify AWS services as the source, Amazon Location Service as the specific service, and Location Device Position Event as the event type.
    EventBridge creation method
  8. For Target 1, attach the ProcessDevicePosition Lambda function as a target.
    EventBridge target 1
  9. We use Input transformer to customize the event that is committed to the S3 bucket.
    EventBridge target 1 transformer
  10. Configure Input paths map and Input template to organize the payload into the desired format.
    1. The following code is the input paths map:
      {
          EventType: $.detail.EventType
          TrackerName: $.detail.TrackerName
          DeviceId: $.detail.DeviceId
          SampleTime: $.detail.SampleTime
          ReceivedTime: $.detail.ReceivedTime
          Longitude: $.detail.Position[0]
          Latitude: $.detail.Position[1]
      }

    2. The following code is the input template:
      {
          "EventType":<EventType>,
          "TrackerName":<TrackerName>,
          "DeviceId":<DeviceId>,
          "SampleTime":<SampleTime>,
          "ReceivedTime":<ReceivedTime>,
          "Position":[<Longitude>, <Latitude>]
      }

  11. For Target 2, choose the ProcessDevicePositionFirehose delivery stream as a target.
    EventBridge target 2

This target requires an IAM role that allows one or multiple records to be written to the Firehose delivery stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecords"
            ],
            "Resource": [
                "arn:aws:firehose:<region>:<account-id>:deliverystream/<delivery-stream-name>"
            ],
            "Effect": "Allow"
        }
    ]
}

Crawl and catalog the data using AWS Glue

After sufficient data has been generated, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the crawlers that have been created, location-analytics-glue-crawler-lambda and location-analytics-glue-crawler-firehose.
  3. Choose Run.

The crawlers will automatically classify the data into JSON format, group the records into tables and partitions, and commit associated metadata to the AWS Glue Data Catalog.
Crawlers

  1. When the Last run statuses of both crawlers show as Succeeded, confirm that two tables (lambda and firehose) have been created on the Tables page.

The solution partitions the incoming location data based on the deviceid field. Therefore, as long as there are no new devices or schema changes, the crawlers don’t need to run again. However, if new devices are added, or a different field is used for partitioning, the crawlers need to run again.
Tables

You’re now ready to query the tables using Athena.

Query the data using Athena

Athena is a serverless, interactive analytics service built to analyze unstructured, semi-structured, and structured data where it is hosted. If this is your first time using the Athena console, follow the instructions to set up a query result location in Amazon S3. To query the data with Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. For Data source, choose AwsDataCatalog.
  3. For Database, choose location-analytics-glue-database.
  4. On the options menu (three vertical dots), choose Preview Table to query the content of both tables.
    Preview table

The query displays 10 sample positional records currently stored in the table. The following screenshot is an example from previewing the firehose table. The firehose table stores raw, unmodified data from the Amazon Location tracker.
Query results
You can now experiment with geospatial queries.The GeoJSON file for the 2021 London ULEZ expansion is part of the repository, and has already been converted into a query compatible with both Athena tables.

  1. Copy and paste the content from the 1-firehose-athena-ulez-2021-create-view.sql file found in the examples/firehose folder into the query editor.

This query uses the ST_Within geospatial function to determine if a recorded position is inside or outside the ULEZ zone defined by the polygon. A new view called ulezvehicleanalysis_firehose is created with a new column, insidezone, which captures whether the recorded position exists within the zone.

A simple Python utility is provided, which converts the polygon features found in the downloaded GeoJSON file into ST_Polygon strings based on the well-known text format that can be used directly in an Athena query.

  1. Choose Preview View on the ulezvehicleanalysis_firehose view to explore its content.
    Preview view

You can now run queries against this view to gain overarching insights.

  1. Copy and paste the content from the 2-firehose-athena-ulez-2021-query-days-in-zone.sql file found in the examples/firehose folder into the query editor.

This query establishes the total number of days each vehicle has entered ULEZ, and what the expected total charges would be. The query has been parameterized using the ? placeholder character. Parameterized queries allow you to rerun the same query with different parameter values.

  1. Enter the daily fee amount for Parameter 1, then run the query.
    Query editor

The results display each vehicle, the total number of days spent in the proposed ULEZ, and the total charges based on the daily fee you entered.
Query results
You can repeat this exercise using the lambda table. Data in the lambda table is augmented with additional vehicle details present in the vehicle maintenance DynamoDB table at the time it is processed by the Lambda function. The solution supports the following fields:

  • MeetsEmissionStandards (Boolean)
  • Mileage (Number)
  • PurchaseDate (String, in YYYY-MM-DD format)

You can also enrich the new data as it arrives.

  1. On the DynamoDB console, find the vehicle maintenance table under Tables. The table name is provided as output VehicleMaintenanceDynamoTable in the deployed CloudFormation stack.
  2. Choose Explore table items to view the content of the table.
  3. Choose Create item to create a new record for a vehicle.
    Create item
  4. Enter DeviceId (such as vehicle1 as a String), PurchaseDate (such as 2005-10-01 as a String), Mileage (such as 10000 as a Number), and MeetsEmissionStandards (with a value such as False as Boolean).
  5. Choose Create item to create the record.
    Create item
  6. Duplicate the newly created record with additional entries for other vehicles (such as for vehicle2 or vehicle3), modifying the values of the attributes slightly each time.
  7. Rerun the location-analytics-glue-crawler-lambda AWS Glue crawler after new data has been generated to confirm that the update to the schema with new fields is registered.
  8. Copy and paste the content from the 1-lambda-athena-ulez-2021-create-view.sql file found in the examples/lambda folder into the query editor.
  9. Preview the ulezvehicleanalysis_lambda view to confirm that the new columns have been created.

If errors such as Column 'mileage' cannot be resolved are displayed, the data enrichment is not taking place, or the AWS Glue crawler has not yet detected updates to the schema.

If the Preview table option is only returning results from before you created records in the DynamoDB table, return the query results in descending order using sampletime (for example, order by sampletime desc limit 100;).
Query results
Now we focus on the vehicles that don’t currently meet emissions standards, and order the vehicles in descending order based on the mileage per year (calculated using the latest mileage / age of vehicle in years).

  1. Copy and paste the content from the 2-lambda-athena-ulez-2021-query-days-in-zone.sql file found in the examples/lambda folder into the query editor.
    Query results

In this example, we can see that out of our fleet of vehicles, five have been reported as not meeting emission standards. We can also see the vehicles that have accumulated high mileage per year, and the number of days spent in the proposed ULEZ. The fleet operator may now decide to prioritize these vehicles for replacement. Because location data is enriched with the most up-to-date vehicle maintenance data at the time it is ingested, you can further evolve these queries to run over a defined time window. For example, you could factor in mileage changes within the past year.

Due to the dynamic nature of the data enrichment, any new data being committed to Amazon S3, along with the query results, will be altered as and when records are updated in the DynamoDB vehicle maintenance table.

Clean up

Refer to the instructions in the README file to clean up the resources provisioned for this solution.

Conclusion

This post demonstrated how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use the collected device position data to drive analytical insights using AWS Glue and Athena. By tracking these assets in real time and storing the results, companies can derive valuable insights on how effectively their fleets are being utilized and better react to changes in the future. You can now explore extending this sample code with your own device tracking data and analytics requirements.


About the Authors

Alan Peaty is a Senior Partner Solutions Architect at AWS. Alan helps Global Systems Integrators (GSIs) and Global Independent Software Vendors (GISVs) solve complex customer challenges using AWS services. Prior to joining AWS, Alan worked as an architect at systems integrators to translate business requirements into technical solutions. Outside of work, Alan is an IoT enthusiast and a keen runner who loves to hit the muddy trails of the English countryside.

Parag Srivastava is a Solutions Architect at AWS, helping enterprise customers with successful cloud adoption and migration. During his professional career, he has been extensively involved in complex digital transformation projects. He is also passionate about building innovative solutions around geospatial aspects of addresses.

Bring your workforce identity to Amazon EMR Studio and Athena

Post Syndicated from Manjit Chakraborty original https://aws.amazon.com/blogs/big-data/bring-your-workforce-identity-to-amazon-emr-studio-and-athena/

Customers today may struggle to implement proper access controls and auditing at the user level when multiple applications are involved in data access workflows. The key challenge is to implement proper least-privilege access controls based on user identity when one application accesses data on behalf of the user in another application. It forces you to either give all users broad access through the application with no auditing, or try to implement complex bespoke solutions to map roles to users.

Using AWS IAM Identity Center, you can now propagate user identity to a set of AWS services and minimize the need to build and maintain complex custom systems to vend roles between applications. IAM Identity Center also provides a consolidated view of users and groups in one place that the interconnected applications can use for authorization and auditing.

IAM Identity Center enables centralized management of user access to AWS accounts and applications using identity providers (IDPs) like Okta. This allows users to log in one time with their existing corporate credentials and seamlessly access downstream AWS services supporting identity propagation. With IAM Identity Center, Okta user identities and groups can be automatically synced using SCIM 2.0 for accurate user information in AWS.

Amazon EMR Studio is a unified data analysis environment where you can develop data engineering and data science applications. You can now develop and run interactive queries on Amazon Athena from EMR Studio (for more details, refer to Amazon EMR Studio adds interactive query editor powered by Amazon Athena ). Athena users can access EMR Studio without logging in to the AWS Management Console by enabling federated access from your IdP via IAM Identity Center. This removes the complexity of maintaining different identities and mapping user roles across your IdP, EMR Studio, and Athena.

You can govern Athena workgroups based on user attributes from Okta to control query access and costs. AWS Lake Formation can also use Okta identities to enforce fine-grained access controls through granting and revoking permissions.

IAM Identity Center and Okta single sign-on (SSO) integration streamlines access to EMR Studio and Athena with centralized authentication. Users can have a familiar sign-in experience with their workforce credentials to securely run queries in Athena. Access policies on Athena workgroups and Lake Formation permissions provide governance based on Okta user profiles.

This blog post explains how to enable single sign-on to EMR Studio using IAM Identity Center integration with Okta. It shows how to propagate Okta identities to Athena and Lake Formation to provide granular access controls on queries and data. The solution streamlines access to analytics tools with centralized authentication using workforce credentials. It leverages AWS IAM Identity Center, Amazon EMR Studio, Amazon Athena, and AWS Lake Formation.

Solution overview

IAM Identity Center allows users to connect to EMR Studio without needing administrators to manually configure AWS Identity and Access Management (IAM) roles and permissions. It enables mapping of IAM Identity Center groups to existing corporate identity roles and groups. Admins can then assign privileges to roles and groups and assign users to them, enabling granular control over user access. IAM Identity Center provides a central repository of all users in AWS. You can create users and groups directly in IAM Identity Center or connect existing users and groups from providers like Okta, Ping Identity, or Azure AD. It handles authentication through your chosen identity source and maintains a user and group directory for EMR Studio access. Known user identities and logged data access facilitates compliance through auditing user access in AWS CloudTrail.

The following diagram illustrates the solution architecture.

Solution Overview

The EMR Studio workflow consists of the following high-level steps:

  1. The end-user launches EMR Studio using the AWS access portal URL. This URL is provided by an IAM Identity Center administrator via the IAM Identity Center dashboard.
  2. The URL redirects the end-user to the workforce IdP Okta, where the user enters workforce identity credentials.
  3. After successful authentication, the user will be logged in to the AWS console as a federated user.
  4. The user opens EMR Studio and navigates to the Athena query editor using the link available on EMR Studio.
  5. The user selects the correct workgroup as per the user role to run Athena queries.
  6. The query results are stored in separate Amazon Simple Storage Service (Amazon S3) locations with a prefix that is based on user identity.

To implement the solution, we complete the following steps:

  1. Integrate Okta with IAM Identity Center to sync users and groups.
  2. Integrate IAM Identity Center with EMR Studio.
  3. Assign users or groups from IAM Identity Center to EMR Studio.
  4. Set up Lake Formation with IAM Identity Center.
  5. Configure granular role-based entitlements using Lake Formation on propagated corporate identities.
  6. Set up workgroups in Athena for governing access.
  7. Set up Amazon S3 access grants for fine-grained access to Amazon S3 resources like buckets, prefixes, or objects.
  8. Access EMR Studio through the AWS access portal using IAM Identity Center.
  9. Run queries on the Athena SQL editor in EMR Studio.
  10. Review the end-to-end audit trail of workforce identity.

Prerequisites

To follow along this post, you should have the following:

  • An AWS account – If you don’t have one, you can sign up here.
  • An Okta account that has an active subscription – You need an administrator role to set up the application on Okta. If you’re new to Okta, you can sign up for a free trial or a developer account.

For instructions to configure Okta with IAM Identity Center, refer to Configure SAML and SCIM with Okta and IAM Identity Center.

Integrate Okta with IAM Identity Center to sync users and groups

After you have successfully synced users or groups from Okta to IAM Identity Center, you can see them on the IAM Identity Center console, as shown in the following screenshot. For this post, we created and synced two user groups:

  • Data Engineer
  • Data Scientists

Workforce Identity groups in IAM Identity Center

Next, create a trusted token issuer in IAM Identity Center:

  1. On the IAM Identity Center console, choose Settings in the navigation pane.
  2. Choose Create trusted token issuer.
  3. For Issuer URL, enter the URL of the trusted token issuer.
  4. For Trusted token issuer name, enter Okta.
  5. For Map attributes¸ map the IdP attribute Email to the IAM Identity Center attribute Email.
  6. Choose Create trusted token issuer.
    Create a Trusted Token Issuer in IAM Identity Center

The following screenshot shows your new trusted token issuer on the IAM Identity Center console.

Okta Trusted Token Issuer in Identity Center

Integrate IAM Identity Center with EMR Studio

We start with creating a trusted identity propagation enabled in EMR Studio.

An EMR Studio administrator must perform the steps to configure EMR Studio as an IAM Identity Center-enabled application. This enables EMR Studio to discover and connect to IAM Identity Center automatically to receive sign-in and user directory services.

The point of enabling EMR Studio as an IAM Identity Center-managed application is so you can control user and group permissions from within IAM Identity Center or from a source third-party IdP that’s integrated with it (Okta in this case). When your users sign in to EMR Studio, for example data-engineer or data-scientist, it checks their groups in IAM Identity Center, and these are mapped to roles and entitlements in Lake Formation. In this manner, a group can map to a Lake Formation database role that allows read access to a set of tables or columns.

The following steps show how to create EMR Studio as an AWS-managed application with IAM Identity Center, then we see how the downstream applications like Lake Formation and Athena propagate these roles and entitlements using existing corporate credentials.

  1. On the Amazon EMR console, navigate to EMR Studio.
  2. Choose Create a Studio.
  3. For Setup options, select Custom.
  4. For Studio name, enter a name.
  5. For S3 location for Workspace storage, select Select existing location and enter the Amazon S3 location.

Create EMR Studio with Custom Set up option

6. Configure permission details for the EMR Studio.

Note that when you choose View permission details under Service role, a new pop-up window will open. You need to create an IAM role with the same policies as shown in the pop-up window. You can use the same for your service role and IAM role.

Permission details for EMR studio

  1. On the Create a Studio page, for Authentication, select AWS IAM Identity Center.
  2. For User role, choose your user role.
  3. Under Trusted identity propagation, select Enable trusted identity propagation.
  4. Under Application access, select Only assigned users and groups.
  5. For VPC, enter your VPC.
  6. For Subnets, enter your subnet.
  7. For Security and access, select Default security group.
  8. Choose Create Studio.

Enable Identity Center and Trusted Identity Propagation

You should now see an IAM Identity Center-enabled EMR Studio on the Amazon EMR console.

IAM Identity Center enabled EMR Studio

After the EMR Studio administrator finishes creating the trusted identity propagation-enabled EMR Studio and saves the configuration, the instance of the EMR Studio appears as an IAM Identity Center-enabled application on the IAM Identity Center console.

EMR Studio appears under AWS Managed app in IAM Identity Centre

Assign users or groups from IAM Identity Center to EMR Studio

You can assign users and groups from your IAM Identity Center directory to the EMR Studio application after syncing with IAM. The EMR Studio administrator decides which IAM Identity Center users or groups to include in the app. For example, if you have 10 total groups in IAM Identity Center but don’t want all of them accessing this instance of EMR Studio, you can select which groups to include in the EMR Studio-enabled IAM app.

The following steps assign groups to EMR Studio-enabled IAM Identity Center application:

  1. On the EMR Studio console, navigate to the new EMR Studio instance.
  2. On the Assigned groups tab, choose Assign groups.
  3. Choose which IAM Identity Center groups you want to include in the application. For example, you may choose the Data-Scientist and Data-Engineer groups.
  4. Choose Done.

This allows the EMR Studio administrator to choose specific IAM Identity Center groups to be assigned access to this specific instance integrated with IAM Identity Center. Only the selected groups will be synced and given access, not all groups from the IAM Identity Center directory.

Assign Trusted Identity Propagation enabled EMR studio to your user groups by selecting groups from Studio settings

Set up Lake Formation with IAM Identity Center

To set up Lake Formation with IAM Identity Center, make sure that you have configured Okta as the IdP for IAM Identity Center, and confirm that the users and groups form Okta are now available in IAM Identity Center. Then complete the following steps:

  1. On the Lake Formation console, choose IAM Identity Center Integration under Administration in the navigation pane.

You will see the message “IAM Identity Center enabled” along with the ARN for the IAM Identity Center application.

  1. Choose Create.

In a few minutes, you will see a message indicating that Lake Formation has been successfully integrated with your centralized IAM identities from Okta Identity Center. Specifically, the message will state “Successfully created identity center integration with application ARN,” signifying the integration is now in place between Lake Formation and the identities managed in Okta.

IAM Identity Center enabled AWS Lake Formation

Configure granular role-based entitlements using Lake Formation on propagated corporate identities

We will now set up granular entitlements for our data access in Lake Formation. For this post, we summarize the steps needed to use the existing corporate identities on the Lake Formation console to provide relevant controls and governance on the data, which we will later query through the Athena query editor. To learn about setting up databases and tables in Lake Formation, refer to Getting started with AWS Lake Formation

This post will not go into the full details about Lake Formation. Instead, we will focus on a new capability that has been introduced in Lake Formation—the ability to set up permissions based on your existing corporate identities that are synchronized with IAM Identity Center.

This integration allows Lake Formation to use your organization’s IdP and access management policies to control permissions to data lakes. Rather than defining permissions from scratch specifically for Lake Formation, you can now rely on your existing users, groups, and access controls to determine who can access data catalogs and underlying data sources. Overall, this new integration with IAM Identity Center makes it straightforward to manage permissions for your data lake workloads using your corporate identities. It reduces the administrative overhead of keeping permissions aligned across separate systems. As AWS continues enhancing Lake Formation, features like this will further improve its viability as a full-featured data lake management environment.

In this post, we created a database called zipcode-db-tip and granted full access to the user group Data-Engineer to query on the underlying table in the database. Complete the following steps:

  1. On the Lake Formation console, choose Grant data lake permissions.
  2. For Principals, select IAM Identity Center.
  3. For Users and groups, select Data-Engineer.
  4. For LF-Tags or catalog resources, select Named Data Catalog resources.
  5. For Databases, choose zipcode-db-tip.
  6. For Tables, choose tip-zipcode.
    Grant Data Lake permissions to users in IAM Identity Center

Similarly, we need to provide the relevant access on the underlying tables to the users and groups for them to be able to query on the data.

  1. Repeat the preceding steps to provide access to the Data-Engineer group to be able to query on the data.
  2. For Table permissions, select Select, Describe, and Super.
  3. For Data permissions, select All data access.

You can grant selective access on rows and comments as per your specific requirements.

Grant Table permissions in AWS Data Lake

Set up workgroups in Athena

Athena workgroups are an AWS feature that allows you to isolate data and queries within an AWS account. It provides a way to segregate data and control access so that each group can only access the data that is relevant to them. Athena workgroups are useful for organizations that want to restrict access to sensitive datasets or help prevent queries from impacting each other. When you create a workgroup, you can assign users and roles to it. Queries launched within a workgroup will run with the access controls and settings configured for that workgroup. They enable governance, security, and resource controls at a granular level. Athena workgroups are an important feature for managing and optimizing Athena usage across large organizations.

In this post, we create a workgroup specifically for members of our Data Engineering team. Later, when logged in under Data Engineer user profiles, we run queries from within this workgroup to demonstrate how access to Athena workgroups can be restricted based on the user profile. This allows governance policies to be enforced, making sure users can only access permitted datasets and queries based on their role.

  1. On the Athena console, choose Workgroups under Administration in the navigation pane.
  2. Choose Create workgroup.
  3. For Authentication, select AWS Identity Center.
  4. For Service role to authorize Athena, select Create and use a new service role.
  5. For Service role name, enter a name for your role.
    Select IAM Identity Centre for Athena Authentication option
  6. For Location of query result, enter an Amazon S3 location for saving your Athena query results.

This is a mandatory field when you specify IAM Identity Center for authentication.

Configure location for query result and enable user identity based S3 prefix

After you create the workgroup, you need to assign users and groups to it. For this post, we create a workgroup named data-engineer and assign the group Data-Engineer (propagated through the trusted identity propagation from IAM Identity Center).

  1. On the Groups tab on the data-engineer details page, select the user group to assign and choose Assign groups.
    Assign groups option is available in the Groups tab of Workgroup settings

Set up Amazon S3 access grants to separate the query results for each workforce identity

Next, we set up Amazon S3 grants.

You can watch the following video to set up the grants or refer to Use Amazon EMR with S3 Access Grants to scale Spark access Amazon S3 for instructions.

Initiate login through AWS federated access using the IAM Identity Center access portal

Now we’re ready to connect to EMR Studio and federated login using IAM Identity Center authentication:

  1. On the IAM Identity Center console, navigate to the dashboard and choose the AWS access portal URL.
  2. A browser pop-up directs you to the Okta login page, where you enter your Okta credentials.
  3. After successful authentication, you’ll be logged in to the AWS console as a federated user.
  4. Choose the EMR Studio application.
  5. After you federate to EMR Studio, choose Query Editor in the navigation pane to open a new tab with the Athena query editor.

The following video shows a federated user using the AWS access portal URL to access EMR Studio using IAM Identity Center authentication.

Run queries with granular access on the editor

On EMR Studio, the user can open the Athena query editor and then specify the correct workgroup in the query editor to run the queries.

Athena Query result in data-engineer workgroup

The data engineer can query only the tables on which the user has access. The query results will appear under the S3 prefix, which is separate for each workforce identity.

Review the end-to-end audit trail of workforce identity

The IAM Identity Center administrator can look into the downstream apps that are trusted for identity propagation, as shown in the following screenshot of the IAM Identity Center console.

AWS IAM Identity Center view of the trusted applications

On the CloudTrail console, the event history displays the event name and resource accessed by the specific workforce identity.

Auditors can see the workforce identity who executed the query on AWS Data Lake

When you choose an event in CloudTrail, the auditors can see the unique user ID that accessed the underlying AWS Analytics services.

Clean up

Complete the following steps to clean up your resources:

  1. Delete the Okta applications that you created to integrate with IAM Identity Center.
  2. Delete IAM Identity Center configuration.
  3. Delete the EMR Studio that you created for testing.
  4. Delete the IAM role that you created for IAM Identity Center and EMR Studio integration.

Conclusion

In this post, we showed you a detailed walkthrough to bring your workforce identity to EMR Studio and propagate the identity to connected AWS applications like Athena and Lake Formation. This solution provides your workforce with a familiar sign-in experience, without the need to remember additional credentials or maintain complex role mapping across different analytics systems. In addition, it provides auditors with end-to-end visibility into workforce identities and their access to analytics services.

To learn more about trusted identity propagation and EMR Studio, refer to Integrate Amazon EMR with AWS IAM Identity Center.


About the authors

Manjit Chakraborty is a Senior Solutions Architect at AWS. He is a Seasoned & Result driven professional with extensive experience in Financial domain having worked with customers on advising, designing, leading, and implementing core-business enterprise solutions across the globe. In his spare time, Manjit enjoys fishing, practicing martial arts and playing with his daughter.

Neeraj Roy is a Principal Solutions Architect at AWS based out of London. He works with Global Financial Services customers to accelerate their AWS journey. In his spare time, he enjoys reading and spending time with his family.

Use AWS Glue ETL to perform merge, partition evolution, and schema evolution on Apache Iceberg

Post Syndicated from Satyanarayana Adimula original https://aws.amazon.com/blogs/big-data/use-aws-glue-etl-to-perform-merge-partition-evolution-and-schema-evolution-on-apache-iceberg/

As enterprises collect increasing amounts of data from various sources, the structure and organization of that data often need to change over time to meet evolving analytical needs. However, altering schema and table partitions in traditional data lakes can be a disruptive and time-consuming task, requiring renaming or recreating entire tables and reprocessing large datasets. This hampers agility and time to insight.

Schema evolution enables adding, deleting, renaming, or modifying columns without needing to rewrite existing data. This is critical for fast-moving enterprises to augment data structures to support new use cases. For example, an ecommerce company may add new customer demographic attributes or order status flags to enrich analytics. Apache Iceberg manages these schema changes in a backward-compatible way through its innovative metadata table evolution architecture.

Similarly, partition evolution allows seamless adding, dropping, or splitting partitions. For instance, an ecommerce marketplace may initially partition order data by day. As orders accumulate, and querying by day becomes inefficient, they may split to day and customer ID partitions. Table partitioning organizes big datasets most efficiently for query performance. Iceberg gives enterprises the flexibility to incrementally adjust partitions rather than requiring tedious rebuild procedures. New partitions can be added in a fully compatible way without downtime or having to rewrite existing data files.

This post demonstrates how you can harness Iceberg, Amazon Simple Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identity and Access Management (IAM) to implement a transactional data lake supporting seamless evolution. By allowing for painless schema and partition adjustments as data insights evolve, you can benefit from the future-proof flexibility needed for business success.

Overview of solution

For our example use case, a fictional large ecommerce company processes thousands of orders each day. When orders are received, updated, cancelled, shipped, delivered, or returned, the changes are made in their on-premises system, and those changes need to be replicated to an S3 data lake so that data analysts can run queries through Amazon Athena. The changes can contain schema updates as well. Due to the security requirements of different organizations, they need to manage fine-grained access control for the analysts through Lake Formation.

The following diagram illustrates the solution architecture.

The solution workflow includes the following key steps:

  1. Ingest data from on premises into a Dropzone location using a data ingestion pipeline.
  2. Merge the data from the Dropzone location into Iceberg using AWS Glue.
  3. Query the data using Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, complete the following steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Choose Launch Stack:
  4. For Stack name, enter a name (for this post, icebergdemo1).
  5. Choose Next.
  6. Provide information for the following parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Choose Next.
  8. Choose Next again.
  9. In the Review section, review the values you entered.
  10. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

In a few minutes, the stack status will change to CREATE_COMPLETE.

You can go to the Outputs tab of the stack to see all the resources it has provisioned. The resources are prefixed with the stack name you provided (for this post, icebergdemo1).

Create an Iceberg table using Lambda and grant access using Lake Formation

To create an Iceberg table and grant access on it, complete the following steps:

  1. Navigate to the Resources tab of the CloudFormation stack icebergdemo1 and search for logical ID named LambdaFunctionIceberg.
  2. Choose the hyperlink of the associated physical ID.

You’re redirected to the Lambda function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, choose Environment variables in the left pane.
  1. On the Code tab, you can inspect the function code.

The function uses the AWS SDK for Python (Boto3) APIs to provision the resources. It assumes the provisioned data lake admin role to perform the following tasks:

  • Grant DATA_LOCATION_ACCESS access to the data lake admin role on the registered data lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database in the AWS Glue Data Catalog using the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE access on the database using LF-Tags to the data lake IAM user and AWS Glue ETL IAM role
  • Create an Iceberg table using the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName= 'icebergdb1',
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Version': '2'
 }
},
TableInput={
    'Name': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)
  • Assign LF-Tags to the table
  • Grant DESCRIBE and SELECT on the Iceberg table LF-Tags for the data lake IAM user
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER access on the Iceberg table LF-Tags to the AWS Glue ETL IAM role
  1. On the Test tab, choose Test to run the function.

When the function is complete, you will see the message “Executing function: succeeded.”

Lake Formation helps you centrally manage, secure, and globally share data for analytics and machine learning. With Lake Formation, you can manage fine-grained access control for your data lake data on Amazon S3 and its metadata in the Data Catalog.

To add an Amazon S3 location as Iceberg storage in your data lake, register the location with Lake Formation. You can then use Lake Formation permissions for fine-grained access control to the Data Catalog objects that point to this location, and to the underlying data in the location.

The CloudFormation stack registered the data lake location.

Data location permissions in Lake Formation enable principals to create and alter Data Catalog resources that point to the designated registered Amazon S3 locations. Data location permissions work in addition to Lake Formation data permissions to secure information in your data lake.

Lake Formation tag-based access control (LF-TBAC) is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-Tags. You can attach LF-Tags to Data Catalog resources, Lake Formation principals, and table columns. You can assign and revoke permissions on Lake Formation resources using these LF-Tags. Lake Formation allows operations on those resources when the principal’s tag matches the resource tag.

Verify the Iceberg table from the Lake Formation console

To verify the Iceberg table, complete the following steps:

  1. On the Lake Formation console, choose Databases in the navigation pane.
  2. Open the details page for icebergdb1.

You can see the associated database LF-Tags.

  1. Choose Tables in the navigation pane.
  2. Open the details page for ecomorders.

In the Table details section, you can observe the following:

  • Table format shows as Apache Iceberg
  • Table management shows as Managed by Data Catalog
  • Location lists the data lake location of the Iceberg table

In the LF-Tags section, you can see the associated table LF-Tags.

In the Table details section, expand Advanced table properties to view the following:

  • metadata_location points to the location of the Iceberg table’s metadata file
  • table_type shows as ICEBERG

On the Schema tab, you can view the columns defined on the Iceberg table.

Integrate Iceberg with the AWS Glue Data Catalog and Amazon S3

Iceberg tracks individual data files in a table instead of directories. When there is an explicit commit on the table, Iceberg creates data files and adds them to the table. Iceberg maintains the table state in metadata files. Any change in table state creates a new metadata file that atomically replaces the older metadata. Metadata files track the table schema, partitioning configuration, and other properties.

Iceberg requires file systems that support the operations to be compatible with object stores like Amazon S3.

Iceberg creates snapshots for the table contents. Each snapshot is a complete set of data files in the table at a point in time. Data files in snapshots are stored in one or more manifest files that contain a row for each data file in the table, its partition data, and its metrics.

The following diagram illustrates this hierarchy.

When you create an Iceberg table, it creates the metadata folder first and a metadata file in the metadata folder. The data folder is created when you load data into the Iceberg table.

Contents of the Iceberg metadata file

The Iceberg metadata file contains a lot of information, including the following:

  • format-version –Version of the Iceberg table
  • Location – Amazon S3 location of the table
  • Schemas – Name and data type of all columns on the table
  • partition-specs – Partitioned columns
  • sort-orders – Sort order of columns
  • properties – Table properties
  • current-snapshot-id – Current snapshot
  • refs – Table references
  • snapshots – List of snapshots, each containing the following information:
    • sequence-number – Sequence number of snapshots in chronological order (the highest number represents the current snapshot, 1 for the first snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was committed
    • summary – Summary of changes committed
    • manifest-list – List of manifests; this file name starts with snap-< snapshot-id >
  • schema-id – Sequence number of the schema in chronological order (the highest number represents the current schema)
  • snapshot-log – List of snapshots in chronological order
  • metadata-log – List of metadata files in chronological order

The metadata file has all the historical changes to the table’s data and schema. Reviewing the contents on the metafile file directly can be a time-consuming task. Fortunately, you can query the Iceberg metadata using Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 supports Iceberg tables registered with Lake Formation. In the AWS Glue ETL jobs, you need the following code to enable the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Set up configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For read/write access to underlying data, in addition to Lake Formation permissions, the AWS Glue IAM role to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for temporary credentials to access the data.

The CloudFormation stack provisioned the four AWS Glue ETL jobs for you. The name of each job starts with your stack name (icebergdemo1). Complete the following steps to view the jobs:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Search for jobs with icebergdemo1 in the name.

Merge data from Dropzone into the Iceberg table

For our use case, the company ingests their ecommerce orders data daily from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three files with sample orders for 3 days, as shown in the following figures. You see the data in the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/data.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run daily to merge the data into the Iceberg table. It has the following logic to add or update the data on Iceberg:

  • Create a Spark DataFrame from input data:
df = spark.read.format(dropzone_dataformat).option("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].cast(IntegerType())) \
    .withColumn("quantity", df["quantity"].cast(IntegerType()))
df.createOrReplaceTempView("input_table")
  • For a new order, add it to the table
  • If the table has a matching order, update the status and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.status = s.status,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Complete the following steps to run the AWS Glue merge job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, choose Run with parameters.
  4. On the Run parameters page, go to Job parameters.
  5. For the --dropzone_path parameter, provide the S3 location of the input data (icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge1).
  6. Run the job to add all the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge2.
  8. Run the job again to add orders 2001 and 2002, and update orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge3.
  10. Run the job again to add order 3001 and update orders 1001, 1003, 2001, and 2002.

Go to the data folder of table to see the data files written by Iceberg when you merged the data into the table using the Glue ETL job icebergdemo1-GlueETL1-merge.

Query Iceberg using Athena

The CloudFormation stack created the IAM user iceberguser1, which has read access on the Iceberg table using LF-Tags. To query Iceberg using Athena via this user, complete the following steps:

  1. Log in as iceberguser1 to the AWS Management Console.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Locate the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Verify Athena engine version 3.

The Athena engine version 3 supports Iceberg file formats, including Parquet, ORC, and Avro.

  1. Go to the Athena query editor.
  2. Choose the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, choose icebergdb1. You will see the table ecomorders.
  4. Run the following query to see the data in the Iceberg table:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the following query to see table’s current partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how table is partitioned. In this example, there are no partitioned fields because you didn’t define any partitions on the table.

Iceberg partition evolution

You may need to change your partition structure; for example, due to trend changes of common query patterns in downstream analytics. A change of partition structure for traditional tables is a significant operation that requires an entire data copy.

Iceberg makes this straightforward. When you change the partition structure on Iceberg, it doesn’t require you to rewrite the data files. The old data written with earlier partitions remains unchanged. New data is written using the new specifications in a new layout. Metadata for each of the partition versions is kept separately.

Let’s add the partition field category to the Iceberg table using the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD category ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is complete, you can query partitions using Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You can see the partition field category, but the partition values are null. There are no new data files in the data folder, because partition evolution is a metadata operation and doesn’t rewrite data files. When you add or update data, you will see the corresponding partition values populated.

Iceberg schema evolution

Iceberg supports in-place table evolution. You can evolve a table schema just like SQL. Iceberg schema updates are metadata changes, so no data files need to be rewritten to perform the schema evolution.

To explore the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution via the AWS Glue console. The job runs the following SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

In the Athena query editor, run the following query:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You can verify the schema changes to the Iceberg table:

  • A new column has been added called shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The data type of the column ordernum has changed from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional update

The data in tracking_number contains the shipping carrier concatenated with the tracking number. Let’s assume that we want to split this data in order to keep the shipping carrier in the shipping_carrier field and the tracking number in the tracking_number field.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the following SparkSQL statement to update the table:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Query the Iceberg table to verify the updated data on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the data has been updated on the table, you should see the partition values populated for category:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clean up

To avoid incurring future charges, clean up the resources you created:

  1. On the Lambda console, open the details page for the function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. In the Environment variables section, choose the key Task_To_Perform and update the value to CLEANUP.
  3. Run the function, which drops the database, table, and their associated LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

In this post, you created an Iceberg table using the AWS Glue API and used Lake Formation to control access on the Iceberg table in a transactional data lake. With AWS Glue ETL jobs, you merged data into the Iceberg table, and performed schema evolution and partition evolution without rewriting or recreating the Iceberg table. With Athena, you queried the Iceberg data and metadata.

Based on the concepts and demonstrations from this post, you can now build a transactional data lake in an enterprise using Iceberg, AWS Glue, Lake Formation, and Amazon S3.


About the Author

Satya Adimula is a Senior Data Architect at AWS based in Boston. With over two decades of experience in data and analytics, Satya helps organizations derive business insights from their data at scale.

Enhance container software supply chain visibility through SBOM export with Amazon Inspector and QuickSight

Post Syndicated from Jason Ng original https://aws.amazon.com/blogs/security/enhance-container-software-supply-chain-visibility-through-sbom-export-with-amazon-inspector-and-quicksight/

In this post, I’ll show how you can export software bills of materials (SBOMs) for your containers by using an AWS native service, Amazon Inspector, and visualize the SBOMs through Amazon QuickSight, providing a single-pane-of-glass view of your organization’s software supply chain.

The concept of a bill of materials (BOM) originated in the manufacturing industry in the early 1960s. It was used to keep track of the quantities of each material used to manufacture a completed product. If parts were found to be defective, engineers could then use the BOM to identify products that contained those parts. An SBOM extends this concept to software development, allowing engineers to keep track of vulnerable software packages and quickly remediate the vulnerabilities.

Today, most software includes open source components. A Synopsys study, Walking the Line: GitOps and Shift Left Security, shows that 8 in 10 organizations reported using open source software in their applications. Consider a scenario in which you specify an open source base image in your Dockerfile but don’t know what packages it contains. Although this practice can significantly improve developer productivity and efficiency, the decreased visibility makes it more difficult for your organization to manage risk effectively.

It’s important to track the software components and their versions that you use in your applications, because a single affected component used across multiple organizations could result in a major security impact. According to a Gartner report titled Gartner Report for SBOMs: Key Takeaways You Should know, by 2025, 60 percent of organizations building or procuring critical infrastructure software will mandate and standardize SBOMs in their software engineering practice, up from less than 20 percent in 2022. This will help provide much-needed visibility into software supply chain security.

Integrating SBOM workflows into the software development life cycle is just the first step—visualizing SBOMs and being able to search through them quickly is the next step. This post describes how to process the generated SBOMs and visualize them with Amazon QuickSight. AWS also recently added SBOM export capability in Amazon Inspector, which offers the ability to export SBOMs for Amazon Inspector monitored resources, including container images.

Why is vulnerability scanning not enough?

Scanning and monitoring vulnerable components that pose cybersecurity risks is known as vulnerability scanning, and is fundamental to organizations for ensuring a strong and solid security posture. Scanners usually rely on a database of known vulnerabilities, the most common being the Common Vulnerabilities and Exposures (CVE) database.

Identifying vulnerable components with a scanner can prevent an engineer from deploying affected applications into production. You can embed scanning into your continuous integration and continuous delivery (CI/CD) pipelines so that images with known vulnerabilities don’t get pushed into your image repository. However, what if a new vulnerability is discovered but has not been added to the CVE records yet? A good example of this is the Apache Log4j vulnerability, which was first disclosed on Nov 24, 2021 and only added as a CVE on Dec 1, 2021. This means that for 7 days, scanners that relied on the CVE system weren’t able to identify affected components within their organizations. This issue is known as a zero-day vulnerability. Being able to quickly identify vulnerable software components in your applications in such situations would allow you to assess the risk and come up with a mitigation plan without waiting for a vendor or supplier to provide a patch.

In addition, it’s also good hygiene for your organization to track usage of software packages, which provides visibility into your software supply chain. This can improve collaboration between developers, operations, and security teams, because they’ll have a common view of every software component and can collaborate effectively to address security threats.

In this post, I present a solution that uses the new Amazon Inspector feature to export SBOMs from container images, process them, and visualize the data in QuickSight. This gives you the ability to search through your software inventory on a dashboard and to use natural language queries through QuickSight Q, in order to look for vulnerabilities.

Solution overview

Figure 1 shows the architecture of the solution. It is fully serverless, meaning there is no underlying infrastructure you need to manage. This post uses a newly released feature within Amazon Inspector that provides the ability to export a consolidated SBOM for Amazon Inspector monitored resources across your organization in commonly used formats, including CycloneDx and SPDX.

Figure 1: Solution architecture diagram

Figure 1: Solution architecture diagram

The workflow in Figure 1 is as follows:

  1. The image is pushed into Amazon Elastic Container Registry (Amazon ECR), which sends an Amazon EventBridge event.
  2. This invokes an AWS Lambda function, which starts the SBOM generation job for the specific image.
  3. When the job completes, Amazon Inspector deposits the SBOM file in an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Another Lambda function is invoked whenever a new JSON file is deposited. The function performs the data transformation steps and uploads the new file into a new S3 bucket.
  5. Amazon Athena is then used to perform preliminary data exploration.
  6. A dashboard on Amazon QuickSight displays SBOM data.

Implement the solution

This section describes how to deploy the solution architecture.

In this post, you’ll perform the following tasks:

  • Create S3 buckets and AWS KMS keys to store the SBOMs
  • Create an Amazon Elastic Container Registry (Amazon ECR) repository
  • Deploy two AWS Lambda functions to initiate the SBOM generation and transformation
  • Set up Amazon EventBridge rules to invoke Lambda functions upon image push into Amazon ECR
  • Run AWS Glue crawlers to crawl the transformed SBOM S3 bucket
  • Run Amazon Athena queries to review SBOM data
  • Create QuickSight dashboards to identify libraries and packages
  • Use QuickSight Q to identify libraries and packages by using natural language queries

Deploy the CloudFormation stack

The AWS CloudFormation template we’ve provided provisions the S3 buckets that are required for the storage of raw SBOMs and transformed SBOMs, the Lambda functions necessary to initiate and process the SBOMs, and EventBridge rules to run the Lambda functions based on certain events. An empty repository is provisioned as part of the stack, but you can also use your own repository.

To deploy the CloudFormation stack

  1. Download the CloudFormation template.
  2. Browse to the CloudFormation service in your AWS account and choose Create Stack.
  3. Upload the CloudFormation template you downloaded earlier.
  4. For the next step, Specify stack details, enter a stack name.
  5. You can keep the default value of sbom-inspector for EnvironmentName.
  6. Specify the Amazon Resource Name (ARN) of the user or role to be the admin for the KMS key.
  7. Deploy the stack.

Set up Amazon Inspector

If this is the first time you’re using Amazon Inspector, you need to activate the service. In the Getting started with Amazon Inspector topic in the Amazon Inspector User Guide, follow Step 1 to activate the service. This will take some time to complete.

Figure 2: Activate Amazon Inspector

Figure 2: Activate Amazon Inspector

SBOM invocation and processing Lambda functions

This solution uses two Lambda functions written in Python to perform the invocation task and the transformation task.

  • Invocation task — This function is run whenever a new image is pushed into Amazon ECR. It takes in the repository name and image tag variables and passes those into the create_sbom_export function in the SPDX format. This prevents duplicated SBOMs, which helps to keep the S3 data size small.
  • Transformation task — This function is run whenever a new file with the suffix .json is added to the raw S3 bucket. It creates two files, as follows:
    1. It extracts information such as image ARN, account number, package, package version, operating system, and SHA from the SBOM and exports this data to the transformed S3 bucket under a folder named sbom/.
    2. Because each package can have more than one CVE, this function also extracts the CVE from each package and stores it in the same bucket in a directory named cve/. Both files are exported in Apache Parquet so that the file is in a format that is optimized for queries by Amazon Athena.

Populate the AWS Glue Data Catalog

To populate the AWS Glue Data Catalog, you need to generate the SBOM files by using the Lambda functions that were created earlier.

To populate the AWS Glue Data Catalog

  1. You can use an existing image, or you can continue on to create a sample image.
  2. Open an AWS Cloudshell terminal.
  3. Run the follow commands
    # Pull the nginx image from a public repo
    docker pull public.ecr.aws/nginx/nginx:1.19.10-alpine-perl
    
    docker tag public.ecr.aws/nginx/nginx:1.19.10-alpine-perl <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sbom-inspector:nginxperl
    
    # Authenticate to ECR, fill in your account id
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com
    
    # Push the image into ECR
    docker push <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sbom-inspector:nginxperl

  4. An image is pushed into the Amazon ECR repository in your account. This invokes the Lambda functions that perform the SBOM export by using Amazon Inspector and converts the SBOM file to Parquet.
  5. Verify that the Parquet files are in the transformed S3 bucket:
    1. Browse to the S3 console and choose the bucket named sbom-inspector-<ACCOUNT-ID>-transformed. You can also track the invocation of each Lambda function in the Amazon CloudWatch log console.
    2. After the transformation step is complete, you will see two folders (cve/ and sbom/)in the transformed S3 bucket. Choose the sbom folder. You will see the transformed Parquet file in it. If there are CVEs present, a similar file will appear in the cve folder.

    The next step is to run an AWS Glue crawler to determine the format, schema, and associated properties of the raw data. You will need to crawl both folders in the transformed S3 bucket and store the schema in separate tables in the AWS Glue Data Catalog.

  6. On the AWS Glue Service console, on the left navigation menu, choose Crawlers.
  7. On the Crawlers page, choose Create crawler. This starts a series of pages that prompt you for the crawler details.
  8. In the Crawler name field, enter sbom-crawler, and then choose Next.
  9. Under Data sources, select Add a data source.
  10. Now you need to point the crawler to your data. On the Add data source page, choose the Amazon S3 data store. This solution in this post doesn’t use a connection, so leave the Connection field blank if it’s visible.
  11. For the option Location of S3 data, choose In this account. Then, for S3 path, enter the path where the crawler can find the sbom and cve data, which is s3://sbom-inspector-<ACCOUNT-ID>-transformed/sbom/ and s3://sbom-inspector-<ACCOUNT-ID>-transformed/cve/. Leave the rest as default and select Add an S3 data source.
     
    Figure 3: Data source for AWS Glue crawler

    Figure 3: Data source for AWS Glue crawler

  12. The crawler needs permissions to access the data store and create objects in the Data Catalog. To configure these permissions, choose Create an IAM role. The AWS Identity and Access Management (IAM) role name starts with AWSGlueServiceRole-, and in the field, you enter the last part of the role name. Enter sbomcrawler, and then choose Next.
  13. Crawlers create tables in your Data Catalog. Tables are contained in a database in the Data Catalog. To create a database, choose Add database. In the pop-up window, enter sbom-db for the database name, and then choose Create.
  14. Verify the choices you made in the Add crawler wizard. If you see any mistakes, you can choose Back to return to previous pages and make changes. After you’ve reviewed the information, choose Finish to create the crawler.
    Figure 4: Creation of the AWS Glue crawler

    Figure 4: Creation of the AWS Glue crawler

  15. Select the newly created crawler and choose Run.
  16. After the crawler runs successfully, verify that the table is created and the data schema is populated.
     
    Figure 5: Table populated from the AWS Glue crawler

    Figure 5: Table populated from the AWS Glue crawler

Set up Amazon Athena

Amazon Athena performs the initial data exploration and validation. Athena is a serverless interactive analytics service built on open source frameworks that supports open-table and file formats. Athena provides a simplified, flexible way to analyze data in sources like Amazon S3 by using standard SQL queries. If you are SQL proficient, you can query the data source directly; however, not everyone is familiar with SQL. In this section, you run a sample query and initialize the service so that it can used in QuickSight later on.

To start using Amazon Athena

  1. In the AWS Management Console, navigate to the Athena console.
  2. For Database, select sbom-db (or select the database you created earlier in the crawler).
  3. Navigate to the Settings tab located at the top right corner of the console. For Query result location, select the Athena S3 bucket created from the CloudFormation template, sbom-inspector-<ACCOUNT-ID>-athena.
  4. Keep the defaults for the rest of the settings. You can now return to the Query Editor and start writing and running your queries on the sbom-db database.

You can use the following sample query.

select package, packageversion, cve, sha, imagearn from sbom
left join cve
using (sha, package, packageversion)
where cve is not null;

Your Athena console should look similar to the screenshot in Figure 6.

Figure 6: Sample query with Amazon Athena

Figure 6: Sample query with Amazon Athena

This query joins the two tables and selects only the packages with CVEs identified. Alternatively, you can choose to query for specific packages or identify the most common package used in your organization.

Sample output:

# package packageversion cve sha imagearn
<PACKAGE_NAME> <PACKAGE_VERSION> <CVE> <IMAGE_SHA> <ECR_IMAGE_ARN>

Visualize data with Amazon QuickSight

Amazon QuickSight is a serverless business intelligence service that is designed for the cloud. In this post, it serves as a dashboard that allows business users who are unfamiliar with SQL to identify zero-day vulnerabilities. This can also reduce the operational effort and time of having to look through several JSON documents to identify a single package across your image repositories. You can then share the dashboard across teams without having to share the underlying data.

QuickSight SPICE (Super-fast, Parallel, In-memory Calculation Engine) is an in-memory engine that QuickSight uses to perform advanced calculations. In a large organization where you could have millions of SBOM records stored in S3, importing your data into SPICE helps to reduce the time to process and serve the data. You can also use the feature to perform a scheduled refresh to obtain the latest data from S3.

QuickSight also has a feature called QuickSight Q. With QuickSightQ, you can use natural language to interact with your data. If this is the first time you are initializing QuickSight, subscribe to QuickSight and select Enterprise + Q. It will take roughly 20–30 minutes to initialize for the first time. Otherwise, if you are already using QuickSight, you will need to enable QuickSight Q by subscribing to it in the QuickSight console.

Finally, in QuickSight you can select different data sources, such as Amazon S3 and Athena, to create custom visualizations. In this post, we will use the two Athena tables as the data source to create a dashboard to keep track of the packages used in your organization and the resulting CVEs that come with them.

Prerequisites for setting up the QuickSight dashboard

This process will be used to create the QuickSight dashboard from a template already pre-provisioned through the command line interface (CLI). It also grants the necessary permissions for QuickSight to access the data source. You will need the following:

  • AWS Command Line Interface (AWS CLI) programmatic access with read and write permissions to QuickSight.
  • A QuickSight + Q subscription (only if you want to use the Q feature).
  • QuickSight permissions to Amazon S3 and Athena (enable these through the QuickSight security and permissions interface).
  • Set the default AWS Region where you want to deploy the QuickSight dashboard. This post assumes that you’re using the us-east-1 Region.

Create datasets

In QuickSight, create two datasets, one for the sbom table and another for the cve table.

  1. In the QuickSight console, select the Dataset tab.
  2. Choose Create dataset, and then select the Athena data source.
  3. Name the data source sbom and choose Create data source.
  4. Select the sbom table.
  5. Choose Visualize to complete the dataset creation. (Delete the analyses automatically created for you because you will create your own analyses afterwards.)
  6. Navigate back to the main QuickSight page and repeat steps 1–4 for the cve dataset.

Merge datasets

Next, merge the two datasets to create the combined dataset that you will use for the dashboard.

  1. On the Datasets tab, edit the sbom dataset and add the cve dataset.
  2. Set three join clauses, as follows:
    1. Sha : Sha
    2. Package : Package
    3. Packageversion : Packageversion
  3. Perform a left merge, which will append the cve ID to the package and package version in the sbom dataset.
     
    Figure 7: Combining the sbom and cve datasets

    Figure 7: Combining the sbom and cve datasets

Next, you will create a dashboard based on the combined sbom dataset.

Prepare configuration files

In your terminal, export the following variables. Substitute <QuickSight username> in the QS_USER_ARN variable with your own username, which can be found in the Amazon QuickSight console.

export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export TEMPLATE_ID=”sbom_dashboard”
export QS_USER_ARN=$(aws quicksight describe-user --aws-account-id $ACCOUNT_ID --namespace default --user-name <QuickSight username> | jq .User.Arn)
export QS_DATA_ARN=$(aws quicksight search-data-sets --aws-account-id $ACCOUNT_ID --filters Name="DATASET_NAME",Operator="StringLike",Value="sbom" | jq .DataSetSummaries[0].Arn)

Validate that the variables are set properly. This is required for you to move on to the next step; otherwise you will run into errors.

echo ACCOUNT_ID is $ACCOUNT_ID || echo ACCOUNT_ID is not set
echo TEMPLATE_ID is $TEMPLATE_ID || echo TEMPLATE_ID is not set
echo QUICKSIGHT USER ARN is $QS_USER_ARN || echo QUICKSIGHT USER ARN is not set
echo QUICKSIGHT DATA ARN is $QS_DATA_ARN || echo QUICKSIGHT DATA ARN is not set

Next, use the following commands to create the dashboard from a predefined template and create the IAM permissions needed for the user to view the QuickSight dashboard.

cat < ./dashboard.json
{
    "SourceTemplate": {
      "DataSetReferences": [
        {
          "DataSetPlaceholder": "sbom",
          "DataSetArn": $QS_DATA_ARN
        }
      ],
      "Arn": "arn:aws:quicksight:us-east-1:293424211206:template/sbom_qs_template"
    }
}
EOF

cat < ./dashboardpermissions.json
[
    {
      "Principal": $QS_USER_ARN,
      "Actions": [
        "quicksight:DescribeDashboard",
        "quicksight:ListDashboardVersions",
        "quicksight:UpdateDashboardPermissions",
        "quicksight:QueryDashboard",
        "quicksight:UpdateDashboard",
        "quicksight:DeleteDashboard",
        "quicksight:DescribeDashboardPermissions",
        "quicksight:UpdateDashboardPublishedVersion"
      ]
    }
]
EOF

Run the following commands to create the dashboard in your QuickSight console.

aws quicksight create-dashboard --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID --name sbom-dashboard --source-entity file://dashboard.json

Note: Run the following describe-dashboard command, and confirm that the response contains a status code of 200. The 200-status code means that the dashboard exists.

aws quicksight describe-dashboard --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID

Use the following update-dashboard-permissions AWS CLI command to grant the appropriate permissions to QuickSight users.

aws quicksight update-dashboard-permissions --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID --grant-permissions file://dashboardpermissions.json

You should now be able to see the dashboard in your QuickSight console, similar to the one in Figure 8. It’s an interactive dashboard that shows you the number of vulnerable packages you have in your repositories and the specific CVEs that come with them. You can navigate to the specific image by selecting the CVE (middle right bar chart) or list images with a specific vulnerable package (bottom right bar chart).

Note: You won’t see the exact same graph as in Figure 8. It will change according to the image you pushed in.

Figure 8: QuickSight dashboard containing SBOM information

Figure 8: QuickSight dashboard containing SBOM information

Alternatively, you can use QuickSight Q to extract the same information from your dataset through natural language. You will need to create a topic and add the dataset you added earlier. For detailed information on how to create a topic, see the Amazon QuickSight User Guide. After QuickSight Q has completed indexing the dataset, you can start to ask questions about your data.

Figure 9: Natural language query with QuickSight Q

Figure 9: Natural language query with QuickSight Q

Conclusion

This post discussed how you can use Amazon Inspector to export SBOMs to improve software supply chain transparency. Container SBOM export should be part of your supply chain mitigation strategy and monitored in an automated manner at scale.

Although it is a good practice to generate SBOMs, it would provide little value if there was no further analysis being done on them. This solution enables you to visualize your SBOM data through a dashboard and natural language, providing better visibility into your security posture. Additionally, this solution is also entirely serverless, meaning there are no agents or sidecars to set up.

To learn more about exporting SBOMs with Amazon Inspector, see the Amazon Inspector User Guide.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Jason Ng

Jason Ng

Jason is a Cloud Sales Center Solutions Architect at AWS. He works with enterprise and independent software vendor (ISV) greenfield customers in ASEAN countries and is part of the Containers Technical Field Community (TFC). He enjoys helping customers modernize their applications, drive growth, and reduce total cost of ownership.

Automate AWS Clean Rooms querying and dashboard publishing using AWS Step Functions and Amazon QuickSight – Part 2

Post Syndicated from Venkata Kampana original https://aws.amazon.com/blogs/big-data/automate-aws-clean-rooms-querying-and-dashboard-publishing-using-aws-step-functions-and-amazon-quicksight-part-2/

Public health organizations need access to data insights that they can quickly act upon, especially in times of health emergencies, when data needs to be updated multiple times daily. For example, during the COVID-19 pandemic, access to timely data insights was critically important for public health agencies worldwide as they coordinated emergency response efforts. Up-to-date information and analysis empowered organizations to monitor the rapidly changing situation and direct resources accordingly.

This is the second post in this series; we recommend that you read this first post before diving deep into this solution. In our first post, Enable data collaboration among public health agencies with AWS Clean Rooms – Part 1 , we showed how public health agencies can create AWS Clean Room collaborations, invite other stakeholders to join the collaboration, and run queries on their collective data without either party having to share or copy underlying data with each other. As mentioned in the previous blog, AWS Clean Rooms enables multiple organizations to analyze their data and unlock insights they can act upon, without having to share sensitive, restricted, or proprietary records.

However, public health organizations leaders and decision-making officials don’t directly access data collaboration outputs from their Amazon Simple Storage Service (Amazon S3) buckets. Instead, they rely on up-to-date dashboards that help them visualize data insights to make informed decisions quickly.

To ensure these dashboards showcase the most updated insights, the organization builders and data architects need to catalog and update AWS Clean Rooms collaboration outputs on an ongoing basis, which often involves repetitive and manual processes that, if not done well, could delay your organization’s access to the latest data insights.

Manually handling repetitive daily tasks at scale poses risks like delayed insights, miscataloged outputs, or broken dashboards. At a large volume, it would require around-the-clock staffing, straining budgets. This manual approach could expose decision-makers to inaccurate or outdated information.

Automating repetitive workflows, validation checks, and programmatic dashboard refreshes removes human bottlenecks and help decrease inaccuracies. Automation helps ensure continuous, reliable processes that deliver the most current data insights to leaders without delays, all while streamlining resources.

In this post, we explain an automated workflow using AWS Step Functions and Amazon QuickSight to help organizations access the most current results and analyses, without delays from manual data handling steps. This workflow implementation will empower decision-makers with real-time visibility into the evolving collaborative analysis outputs, ensuring they have up-to-date, relevant insights that they can act upon quickly

Solution overview

The following reference architecture illustrates some of the foundational components of clean rooms query automation and publishing dashboards using AWS services. We automate running queries using Step Functions with Amazon EventBridge schedules, build an AWS Glue Data Catalog on query outputs, and publish dashboards using QuickSight so they automatically refresh with new data. This allows public health teams to monitor the most recent insights without manual updates.

The architecture consists of the following components, as numbered in the preceding figure:

  1. A scheduled event rule on EventBridge triggers a Step Functions workflow.
  2. The Step Functions workflow initiates the run of a query using the StartProtectedQuery AWS Clean Rooms API. The submitted query runs securely within the AWS Clean Rooms environment, ensuring data privacy and compliance. The results of the query are then stored in a designated S3 bucket, with a unique protected query ID serving as the prefix for the stored data. This unique identifier is generated by AWS Clean Rooms for each query run, maintaining clear segregation of results.
  3. When the AWS Clean Rooms query is successfully complete, the Step Functions workflow calls the AWS Glue API to update the location of the table in the AWS Glue Data Catalog with the Amazon S3 location where the query results were uploaded in Step 2.
  4. Amazon Athena uses the catalog from the Data Catalog to query the information using standard SQL.
  5. QuickSight is used to query, build visualizations, and publish dashboards using the data from the query results.

Prerequisites

For this walkthrough, you need the following:

Launch the CloudFormation stack

In this post, we provide a CloudFormation template to create the following resources:

  • An EventBridge rule that triggers the Step Functions state machine on a schedule
  • An AWS Glue database and a catalog table
  • An Athena workgroup
  • Three S3 buckets:
    • For AWS Clean Rooms to upload the results of query runs
    • For Athena to upload the results for the queries
    • For storing access logs of other buckets
  • A Step Functions workflow designed to run the AWS Clean Rooms query, upload the results to an S3 bucket, and update the table location with the S3 path in the AWS Glue Data Catalog
  • An AWS Key Management Service (AWS KMS) customer-managed key to encrypt the data in S3 buckets
  • AWS Identity and Access Management (IAM) roles and policies with the necessary permissions

To create the necessary resources, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. Enter cleanrooms-query-automation-blog for Stack name.
  2. Enter the membership ID from the AWS Clean Rooms collaboration you created in Part 1 of this series.
  3. Choose Next.

  1. Choose Next again.
  2. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create stack.

After you run the CloudFormation template and create the resources, you can find the following information on the stack Outputs tab on the AWS CloudFormation console:

  • AthenaWorkGroup – The Athena workgroup
  • EventBridgeRule – The EventBridge rule triggering the Step Functions state machine
  • GlueDatabase – The AWS Glue database
  • GlueTable – The AWS Glue table storing metadata for AWS Clean Rooms query results
  • S3Bucket – The S3 bucket where AWS Clean Rooms uploads query results
  • StepFunctionsStateMachine – The Step Functions state machine

Test the solution

The EventBridge rule named cleanrooms_query_execution_Stepfunctions_trigger is scheduled to trigger every 1 hour. When this rule is triggered, it initiates the run of the CleanRoomsBlogStateMachine-XXXXXXX Step Functions state machine. Complete the following steps to test the end-to-end flow of this solution:

  1. On the Step Functions console, navigate to the state machine you created.
  2. On the state machine details page, locate the latest query run.

The details page lists the completed steps:

  • The state machine submits a query to AWS Clean Rooms using the startProtectedQuery API. The output of the API includes the query run ID and its status.
  • The state machine waits for 30 seconds before checking the status of the query run.
  • After 30 seconds, the state machine checks the query status using the getProtectedQuery API. When the status changes to SUCCESS, it proceeds to the next step to retrieve the AWS Glue table metadata information. The output of this step contains the S3 location to which the query run results are uploaded.
  • The state machine retrieves the metadata of the AWS Glue table named patientimmunization, which was created via the CloudFormation stack.
  • The state machine updates the S3 location (the location to which AWS Clean Rooms uploaded the results) in the metadata of the AWS Glue table.
  • After a successful update of the AWS Glue table metadata, the state machine is complete.
  1. On the Athena console, switch the workgroup to CustomWorkgroup.
  2. Run the following query:
“SELECT * FROM "cleanrooms_patientdb "."patientimmunization" limit 10;"

Visualize the data with QuickSight

Now that you can query your data in Athena, you can use QuickSight to visualize the results. Let’s start by granting QuickSight access to the S3 bucket where your AWS Clean Rooms query results are stored.

Grant QuickSight access to Athena and your S3 bucket

First, grant QuickSight access to the S3 bucket:

  1. Sign in to the QuickSight console.
  2. Choose your user name, then choose Manage QuickSight.
  3. Choose Security and permissions.
  4. For QuickSight access to AWS services, choose Manage.
  5. For Amazon S3, choose Select S3 buckets, and choose the S3 bucket named cleanrooms-query-execution-results -XX-XXXX-XXXXXXXXXXXX (XXXXX represents the AWS Region and account number where the solution is deployed).
  6. Choose Save.

Create your datasets and publish visuals

Before you can analyze and visualize the data in QuickSight, you must create datasets for your Athena tables.

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Select Athena.
  4. Enter a name for your dataset.
  5. Choose Create data source.
  6. Choose the AWS Glue database cleanrooms_patientdb and select the table PatientImmunization.
  7. Select Directly query your data.
  8. Choose Visualize.

  1. On the Analysis tab, choose the visual type of your choice and add visuals.

Clean up

Complete the following steps to clean up your resources when you no longer need this solution:

  1. Manually delete the S3 buckets and the data stored in the bucket.
  2. Delete the CloudFormation templates.
  3. Delete the QuickSight analysis.
  4. Delete the data source.

Conclusion

In this post, we demonstrated how to automate running AWS Clean Rooms queries using an API call from Step Functions. We also showed how to update the query results information on the existing AWS Glue table, query the information using Athena, and create visuals using QuickSight.

The automated workflow solution delivers real-time insights from AWS Clean Rooms collaborations to decision makers through automated checks for new outputs, processing, and Amazon QuickSight dashboard refreshes. This eliminates manual handling tasks, enabling faster data-driven decisions based on latest analyses. Additionally, automation frees up staff resources to focus on more strategic initiatives rather than repetitive updates.

Contact the public sector team directly to learn more about how to set up this solution, or reach out to your AWS account team to engage on a proof of concept of this solution for your organization.

About AWS Clean Rooms

AWS Clean Rooms helps companies and their partners more easily and securely analyze and collaborate on their collective datasets—without sharing or copying one another’s underlying data. With AWS Clean Rooms, you can create a secure data clean room in minutes, and collaborate with any other company on the AWS Cloud to generate unique insights about advertising campaigns, investment decisions, and research and development.

The AWS Clean Rooms team is continually building new features to help you collaborate. Watch this video to learn more about privacy-enhanced collaboration with AWS Clean Rooms.

Check out more AWS Partners or contact an AWS Representative to know how we can help accelerate your business.

Additional resources


About the Authors

Venkata Kampana is a Senior Solutions Architect in the AWS Health and Human Services team and is based in Sacramento, CA. In that role, he helps public sector customers achieve their mission objectives with well-architected solutions on AWS.

Jim Daniel is the Public Health lead at Amazon Web Services. Previously, he held positions with the United States Department of Health and Human Services for nearly a decade, including Director of Public Health Innovation and Public Health Coordinator. Before his government service, Jim served as the Chief Information Officer for the Massachusetts Department of Public Health.

Mastering market dynamics: Transforming transaction cost analytics with ultra-precise Tick History – PCAP and Amazon Athena for Apache Spark

Post Syndicated from Pramod Nayak original https://aws.amazon.com/blogs/big-data/mastering-market-dynamics-transforming-transaction-cost-analytics-with-ultra-precise-tick-history-pcap-and-amazon-athena-for-apache-spark/

This post is cowritten with Pramod Nayak, LakshmiKanth Mannem and Vivek Aggarwal from the Low Latency Group of LSEG.

Transaction cost analysis (TCA) is widely used by traders, portfolio managers, and brokers for pre-trade and post-trade analysis, and helps them measure and optimize transaction costs and the effectiveness of their trading strategies. In this post, we analyze options bid-ask spreads from the LSEG Tick History – PCAP dataset using Amazon Athena for Apache Spark. We show you how to access data, define custom functions to apply on data, query and filter the dataset, and visualize the results of the analysis, all without having to worry about setting up infrastructure or configuring Spark, even for large datasets.

Background

Options Price Reporting Authority (OPRA) serves as a crucial securities information processor, collecting, consolidating, and disseminating last sale reports, quotes, and pertinent information for US Options. With 18 active US Options exchanges and over 1.5 million eligible contracts, OPRA plays a pivotal role in providing comprehensive market data.

On February 5, 2024, the Securities Industry Automation Corporation (SIAC) is set to upgrade the OPRA feed from 48 to 96 multicast channels. This enhancement aims to optimize symbol distribution and line capacity utilization in response to escalating trading activity and volatility in the US options market. SIAC has recommended that firms prepare for peak data rates of up to 37.3 GBits per second.

Despite the upgrade not immediately altering the total volume of published data, it enables OPRA to disseminate data at a significantly faster rate. This transition is crucial for addressing the demands of the dynamic options market.

OPRA stands out as one the most voluminous feeds, with a peak of 150.4 billion messages in a single day in Q3 2023 and a capacity headroom requirement of 400 billion messages over a single day. Capturing every single message is critical for transaction cost analytics, market liquidity monitoring, trading strategy evaluation, and market research.

About the data

LSEG Tick History – PCAP is a cloud-based repository, exceeding 30 PB, housing ultra-high-quality global market data. This data is meticulously captured directly within the exchange data centers, employing redundant capture processes strategically positioned in major primary and backup exchange data centers worldwide. LSEG’s capture technology ensures lossless data capture and uses a GPS time-source for nanosecond timestamp precision. Additionally, sophisticated data arbitrage techniques are employed to seamlessly fill any data gaps. Subsequent to capture, the data undergoes meticulous processing and arbitration, and is then normalized into Parquet format using LSEG’s Real Time Ultra Direct (RTUD) feed handlers.

The normalization process, which is integral to preparing the data for analysis, generates up to 6 TB of compressed Parquet files per day. The massive volume of data is attributed to the encompassing nature of OPRA, spanning multiple exchanges, and featuring numerous options contracts characterized by diverse attributes. Increased market volatility and market making activity on the options exchanges further contribute to the volume of data published on OPRA.

The attributes of Tick History – PCAP enable firms to conduct various analyses, including the following:

  • Pre-trade analysis – Evaluate potential trade impact and explore different execution strategies based on historical data
  • Post-trade evaluation – Measure actual execution costs against benchmarks to assess the performance of execution strategies
  • Optimized execution – Fine-tune execution strategies based on historical market patterns to minimize market impact and reduce overall trading costs
  • Risk management – Identify slippage patterns, identify outliers, and proactively manage risks associated with trading activities
  • Performance attribution – Separate the impact of trading decisions from investment decisions when analyzing portfolio performance

The LSEG Tick History – PCAP dataset is available in AWS Data Exchange and can be accessed on AWS Marketplace. With AWS Data Exchange for Amazon S3, you can access PCAP data directly from LSEG’s Amazon Simple Storage Service (Amazon S3) buckets, eliminating the need for firms to store their own copy of the data. This approach streamlines data management and storage, providing clients immediate access to high-quality PCAP or normalized data with ease of use, integration, and substantial data storage savings.

Athena for Apache Spark

For analytical endeavors, Athena for Apache Spark offers a simplified notebook experience accessible through the Athena console or Athena APIs, allowing you to build interactive Apache Spark applications. With an optimized Spark runtime, Athena helps the analysis of petabytes of data by dynamically scaling the number of Spark engines is less than a second. Moreover, common Python libraries such as pandas and NumPy are seamlessly integrated, allowing for the creation of intricate application logic. The flexibility extends to the importation of custom libraries for use in notebooks. Athena for Spark accommodates most open-data formats and is seamlessly integrated with the AWS Glue Data Catalog.

Dataset

For this analysis, we used the LSEG Tick History – PCAP OPRA dataset from May 17, 2023. This dataset comprises the following components:

  • Best bid and offer (BBO) – Reports the highest bid and lowest ask for a security at a given exchange
  • National best bid and offer (NBBO) – Reports the highest bid and lowest ask for a security across all exchanges
  • Trades – Records completed trades across all exchanges

The dataset involves the following data volumes:

  • Trades – 160 MB distributed across approximately 60 compressed Parquet files
  • BBO – 2.4 TB distributed across approximately 300 compressed Parquet files
  • NBBO – 2.8 TB distributed across approximately 200 compressed Parquet files

Analysis overview

Analyzing OPRA Tick History data for Transaction Cost Analysis (TCA) involves scrutinizing market quotes and trades around a specific trade event. We use the following metrics as part of this study:

  • Quoted spread (QS) – Calculated as the difference between the BBO ask and the BBO bid
  • Effective spread (ES) – Calculated as the difference between the trade price and the midpoint of the BBO (BBO bid + (BBO ask – BBO bid)/2)
  • Effective/quoted spread (EQF) – Calculated as (ES / QS) * 100

We calculate these spreads before the trade and additionally at four intervals after the trade (just after, 1 second, 10 seconds, and 60 seconds after the trade).

Configure Athena for Apache Spark

To configure Athena for Apache Spark, complete the following steps:

  1. On the Athena console, under Get started, select Analyze your data using PySpark and Spark SQL.
  2. If this is your first time using Athena Spark, choose Create workgroup.
  3. For Workgroup name¸ enter a name for the workgroup, such as tca-analysis.
  4. In the Analytics engine section, select Apache Spark.
  5. In the Additional configurations section, you can choose Use defaults or provide a custom AWS Identity and Access Management (IAM) role and Amazon S3 location for calculation results.
  6. Choose Create workgroup.
  7. After you create the workgroup, navigate to the Notebooks tab and choose Create notebook.
  8. Enter a name for your notebook, such as tca-analysis-with-tick-history.
  9. Choose Create to create your notebook.

Launch your notebook

If you have already created a Spark workgroup, select Launch notebook editor under Get started.


After your notebook is created, you will be redirected to the interactive notebook editor.


Now we can add and run the following code to our notebook.

Create an analysis

Complete the following steps to create an analysis:

  • Import common libraries:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
  • Create our data frames for BBO, NBBO, and trades:
bbo_quote = spark.read.parquet(f"s3://<bucket>/mt=bbo_quote/f=opra/dt=2023-05-17/*")
bbo_quote.createOrReplaceTempView("bbo_quote")
nbbo_quote = spark.read.parquet(f"s3://<bucket>/mt=nbbo_quote/f=opra/dt=2023-05-17/*")
nbbo_quote.createOrReplaceTempView("nbbo_quote")
trades = spark.read.parquet(f"s3://<bucket>/mt=trade/f=opra/dt=2023-05-17/29_1.parquet")
trades.createOrReplaceTempView("trades")
  • Now we can identify a trade to use for transaction cost analysis:
filtered_trades = spark.sql("select Product, Price,Quantity, ReceiptTimestamp, MarketParticipant from trades")

We get the following output:

+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|Product |Price |Quantity |ReceiptTimestamp |MarketParticipant| 
+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|QQQ 230518C00329000|1.1700000000000000000|10.0000000000000000000|1684338565538021907,NYSEArca|
|QQQ 230518C00329000|1.1700000000000000000|20.0000000000000000000|1684338576071397557,NASDAQOMXPHLX|
|QQQ 230518C00329000|1.1600000000000000000|1.0000000000000000000|1684338579104713924,ISE|
|QQQ 230518C00329000|1.1400000000000000000|1.0000000000000000000|1684338580263307057,NASDAQOMXBX_Options|
|QQQ 230518C00329000|1.1200000000000000000|1.0000000000000000000|1684338581025332599,ISE|
+---------------------+---------------------+---------------------+-------------------+-----------------+

We use the highlighted trade information going forward for the trade product (tp), trade price (tpr), and trade time (tt).

  • Here we create a number of helper functions for our analysis
def calculate_es_qs_eqf(df, trade_price):
    df['BidPrice'] = df['BidPrice'].astype('double')
    df['AskPrice'] = df['AskPrice'].astype('double')
    df["ES"] = ((df["AskPrice"]-df["BidPrice"])/2) - trade_price
    df["QS"] = df["AskPrice"]-df["BidPrice"]
    df["EQF"] = (df["ES"]/df["QS"])*100
    return df

def get_trade_before_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].groupby(groupby_col).last()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_trade_after_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].groupby(groupby_col).first()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_nbbo_trade_before_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].iloc[-1:]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df

def get_nbbo_trade_after_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].iloc[:1]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df
  • In the following function, we create the dataset that contains all the quotes before and after the trade. Athena Spark automatically determines how many DPUs to launch for processing our dataset.
def get_tca_analysis_via_df_single_query(trade_product, trade_price, trade_time):
    # BBO quotes
    bbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, MarketParticipant FROM bbo_quote where Product = '{trade_product}';")
    bbos = bbos.toPandas()

    bbo_just_before = get_trade_before_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_just_after = get_trade_after_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_1s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=1, groupby_col='MarketParticipant')
    bbo_10s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=10, groupby_col='MarketParticipant')
    bbo_60s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=60, groupby_col='MarketParticipant')
    
    all_bbos = pd.concat([bbo_just_before, bbo_just_after, bbo_1s_after, bbo_10s_after, bbo_60s_after], ignore_index=True, sort=False)
    bbos_calculated = calculate_es_qs_eqf(all_bbos, trade_price)

    #NBBO quotes
    nbbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, BestBidParticipant, BestAskParticipant FROM nbbo_quote where Product = '{trade_product}';")
    nbbos = nbbos.toPandas()

    nbbo_just_before = get_nbbo_trade_before_n_seconds(trade_time,nbbos, seconds=0)
    nbbo_just_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=0)
    nbbo_1s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=1)
    nbbo_10s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=10)
    nbbo_60s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=60)

    all_nbbos = pd.concat([nbbo_just_before, nbbo_just_after, nbbo_1s_after, nbbo_10s_after, nbbo_60s_after], ignore_index=True, sort=False)
    nbbos_calculated = calculate_es_qs_eqf(all_nbbos, trade_price)

    calc = pd.concat([bbos_calculated, nbbos_calculated], ignore_index=True, sort=False)
    
    return calc
  • Now let’s call the TCA analysis function with the information from our selected trade:
tp = "QQQ 230518C00329000"
tpr = 1.16
tt = 1684338579104713924
c = get_tca_analysis_via_df_single_query(tp, tpr, tt)

Visualize the analysis results

Now let’s create the data frames we use for our visualization. Each data frame contains quotes for one of the five time intervals for each data feed (BBO, NBBO):

bbo = c[c['MarketParticipant'].isin(['BBO'])]
bbo_bef = bbo[bbo['ReceiptTimestamp'] < tt]
bbo_aft_0 = bbo[bbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
bbo_aft_1 = bbo[bbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
bbo_aft_10 = bbo[bbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
bbo_aft_60 = bbo[bbo['ReceiptTimestamp'] > (tt+60000000000)]

nbbo = c[~c['MarketParticipant'].isin(['BBO'])]
nbbo_bef = nbbo[nbbo['ReceiptTimestamp'] < tt]
nbbo_aft_0 = nbbo[nbbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
nbbo_aft_1 = nbbo[nbbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
nbbo_aft_10 = nbbo[nbbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
nbbo_aft_60 = nbbo[nbbo['ReceiptTimestamp'] > (tt+60000000000)]

In the following sections, we provide example code to create different visualizations.

Plot QS and NBBO before the trade

Use the following code to plot the quoted spread and NBBO before the trade:

fig = px.bar(title="Quoted Spread Before The Trade",
    x=bbo_bef.MarketParticipant,
    y=bbo_bef['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(y=nbbo_bef.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

Plot QS for each market and NBBO after the trade

Use the following code to plot the quoted spread for each market and NBBO immediately after the trade:

fig = px.bar(title="Quoted Spread After The Trade",
    x=bbo_aft_0.MarketParticipant,
    y=bbo_aft_0['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(
    y=nbbo_aft_0.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

Plot QS for each time interval and each market for BBO

Use the following code to plot the quoted spread for each time interval and each market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['QS']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['QS']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['QS']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['QS']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['QS'])])
fig.update_layout(barmode='group',title="BBO Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'},
    yaxis={'title':'Quoted Spread'})
%plotly fig

Plot ES for each time interval and market for BBO

Use the following code to plot the effective spread for each time interval and market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['ES']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['ES']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['ES']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['ES']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['ES'])])
fig.update_layout(barmode='group',title="BBO Effective Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective Spread'})
%plotly fig

Plot EQF for each time interval and market for BBO

Use the following code to plot the effective/quoted spread for each time interval and market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['EQF']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['EQF']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['EQF']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['EQF']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['EQF'])])
fig.update_layout(barmode='group',title="BBO Effective/Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective/Quoted Spread'})
%plotly fig

Athena Spark calculation performance

When you run a code block, Athena Spark automatically determines how many DPUs it requires to complete the calculation. In the last code block, where we call the tca_analysis function, we are actually instructing Spark to process the data, and we then convert the resulting Spark dataframes into Pandas dataframes. This constitutes the most intensive processing part of the analysis, and when Athena Spark runs this block, it shows the progress bar, elapsed time, and how many DPUs are processing data currently. For example, in the following calculation, Athena Spark is utilizing 18 DPUs.

When you configure your Athena Spark notebook, you have the option of setting the maximum number of DPUs that it can use. The default is 20 DPUs, but we tested this calculation with 10, 20, and 40 DPUs to demonstrate how Athena Spark automatically scales to run our analysis. We observed that Athena Spark scales linearly, taking 15 minutes and 21 seconds when the notebook was configured with a maximum of 10 DPUs, 8 minutes and 23 seconds when the notebook was configured with 20 DPUs, and 4 minutes and 44 seconds when the notebook was configured with 40 DPUs. Because Athena Spark charges based on DPU usage, at a per-second granularity, the cost of these calculations is similar, but if you set a higher maximum DPU value, Athena Spark can return the result of the analysis much faster. For more details on Athena Spark pricing please click here.

Conclusion

In this post, we demonstrated how you can use high-fidelity OPRA data from LSEG’s Tick History-PCAP to perform transaction cost analytics using Athena Spark. The availability of OPRA data in a timely manner, complemented with accessibility innovations of AWS Data Exchange for Amazon S3, strategically reduces the time to analytics for firms looking to create actionable insights for critical trading decisions. OPRA generates about 7 TB of normalized Parquet data each day, and managing the infrastructure to provide analytics based on OPRA data is challenging.

Athena’s scalability in handling large-scale data processing for Tick History – PCAP for OPRA data makes it a compelling choice for organizations seeking swift and scalable analytics solutions in AWS. This post shows the seamless interaction between the AWS ecosystem and Tick History-PCAP data and how financial institutions can take advantage of this synergy to drive data-driven decision-making for critical trading and investment strategies.


About the Authors

Pramod Nayak is the Director of Product Management of the Low Latency Group at LSEG. Pramod has over 10 years of experience in the financial technology industry, focusing on software development, analytics, and data management. Pramod is a former software engineer and passionate about market data and quantitative trading.

LakshmiKanth Mannem is a Product Manager in the Low Latency Group of LSEG. He focuses on data and platform products for the low-latency market data industry. LakshmiKanth helps customers build the most optimal solutions for their market data needs.

Vivek Aggarwal is a Senior Data Engineer in the Low Latency Group of LSEG. Vivek works on developing and maintaining data pipelines for processing and delivery of captured market data feeds and reference data feeds.

Alket Memushaj is a Principal Architect in the Financial Services Market Development team at AWS. Alket is responsible for technical strategy, working with partners and customers to deploy even the most demanding capital markets workloads to the AWS Cloud.

Use Amazon Athena with Spark SQL for your open-source transactional table formats

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/use-amazon-athena-with-spark-sql-for-your-open-source-transactional-table-formats/

AWS-powered data lakes, supported by the unmatched availability of Amazon Simple Storage Service (Amazon S3), can handle the scale, agility, and flexibility required to combine different data and analytics approaches. As data lakes have grown in size and matured in usage, a significant amount of effort can be spent keeping the data consistent with business events. To ensure files are updated in a transactionally consistent manner, a growing number of customers are using open-source transactional table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake that help you store data with high compression rates, natively interface with your applications and frameworks, and simplify incremental data processing in data lakes built on Amazon S3. These formats enable ACID (atomicity, consistency, isolation, durability) transactions, upserts, and deletes, and advanced features such as time travel and snapshots that were previously only available in data warehouses. Each storage format implements this functionality in slightly different ways; for a comparison, refer to Choosing an open table format for your transactional data lake on AWS.

In 2023, AWS announced general availability for Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake in Amazon Athena for Apache Spark, which removes the need to install a separate connector or associated dependencies and manage versions, and simplifies the configuration steps required to use these frameworks.

In this post, we show you how to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake table formats. We demonstrate common operations such as creating databases and tables, inserting data into the tables, querying data, and looking at snapshots of the tables in Amazon S3 using Spark SQL in Athena.

Prerequisites

Complete the following prerequisites:

Download and import example notebooks from Amazon S3

To follow along, download the notebooks discussed in this post from the following locations:

After you download the notebooks, import them into your Athena Spark environment by following the To import a notebook section in Managing notebook files.

Navigate to specific Open Table Format section

If you are interested in Iceberg table format, navigate to Working with Apache Iceberg tables section.

If you are interested in Hudi table format, navigate to Working with Apache Hudi tables section.

If you are interested in Delta Lake table format, navigate to Working with Linux foundation Delta Lake tables section.

Working with Apache Iceberg tables

When using Spark notebooks in Athena, you can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook cell that change the cell’s behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena.

In this section, we show how you can use SQL on Apache Spark for Athena to create, analyze, and manage Apache Iceberg tables.

Set up a notebook session

In order to use Apache Iceberg in Athena, while creating or editing a session, select the Apache Iceberg option by expanding the Apache Spark properties section. It will pre-populate the properties as shown in the following screenshot.

This image shows the Apache Iceberg properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section is available in the SparkSQL_iceberg.ipynb file to follow along.

Create a database and Iceberg table

First, we create a database in the AWS Glue Data Catalog. With the following SQL, we can create a database called icebergdb:

%%sql
CREATE DATABASE icebergdb

Next, in the database icebergdb, we create an Iceberg table called noaa_iceberg pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

Insert data into the table

To populate the noaa_iceberg Iceberg table, we insert data from the Parquet table sparkblogdb.noaa_pq that was created as part of the prerequisites. You can do this using an INSERT INTO statement in Spark:

%%sql
INSERT INTO icebergdb.noaa_iceberg select * from sparkblogdb.noaa_pq

Alternatively, you can use CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg table and insert data from a source table in one step:

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (year)
AS SELECT * FROM sparkblogdb.noaa_pq

Query the Iceberg table

Now that the data is inserted in the Iceberg table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature by year for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

We get following output.

Image shows output of first select query

Update data in the Iceberg table

Let’s look at how to update data in our table. We want to update the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea-Tac'. Using Spark SQL, we can run an UPDATE statement against the Iceberg table:

%%sql
UPDATE icebergdb.noaa_iceberg
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can then run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'Sea-Tac'
group by 1,2

We get the following output.

Image shows output of second select query

Compact data files

Open table formats like Iceberg work by creating delta changes in file storage, and tracking the versions of rows through manifest files. More data files leads to more metadata stored in manifest files, and small data files often cause an unnecessary amount of metadata, resulting in less efficient queries and higher Amazon S3 access costs. Running Iceberg’s rewrite_data_files procedure in Spark for Athena will compact data files, combining many small delta change files into a smaller set of read-optimized Parquet files. Compacting files speeds up the read operation when queried. To run compaction on our table, run the following Spark SQL:

%%sql
CALL spark_catalog.system.rewrite_data_files
(table => 'icebergdb.noaa_iceberg', strategy=>'sort', sort_order => 'zorder(name)')

rewrite_data_files offers options to specify your sort strategy, which can help reorganize and compact data.

List table snapshots

Each write, update, delete, upsert, and compaction operation on an Iceberg table creates a new snapshot of a table while keeping the old data and metadata around for snapshot isolation and time travel. To list the snapshots of an Iceberg table, run the following Spark SQL statement:

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

Expire old snapshots

Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. It will never remove files that are still required by a non-expired snapshot. In Spark for Athena, run the following SQL to expire snapshots for the table icebergdb.noaa_iceberg that are older than a specific timestamp:

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff. The output will give a count of the number of data and metadata files deleted.

Drop the table and database

You can run the following Spark SQL to clean up the Iceberg tables and associated data in Amazon S3 from this exercise:

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

Run the following Spark SQL to remove the database icebergdb:

%%sql
DROP DATABASE icebergdb

To learn more about all the operations you can perform on Iceberg tables using Spark for Athena, refer to Spark Queries and Spark Procedures in the Iceberg documentation.

Working with Apache Hudi tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Apache Hudi tables.

Set up a notebook session

In order to use Apache Hudi in Athena, while creating or editing a session, select the Apache Hudi option by expanding the Apache Spark properties section.

This image shows the Apache Hudi properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_hudi.ipynb file to follow along.

Create a database and Hudi table

First, we create a database called hudidb that will be stored in the AWS Glue Data Catalog followed by Hudi table creation:

%%sql
CREATE DATABASE hudidb

We create a Hudi table pointing to a location in Amazon S3 where we will load the data. Note that the table is of copy-on-write type. It is defined by type= 'cow' in the table DDL. We have defined station and date as the multiple primary keys and preCombinedField as year. Also, the table is partitioned on year. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
year string)
USING HUDI
PARTITIONED BY (year)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'year',
type = 'cow'
)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaahudi/'

Insert data into the table

Like with Iceberg, we use the INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO hudidb.noaa_hudi select * from sparkblogdb.noaa_pq

Query the Hudi table

Now that the table is created, let’s run a query to find the maximum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Hudi table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_hudi table:

%%sql
UPDATE hudidb.noaa_hudi
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We run the previous SELECT query to find the maximum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'Sea-Tac'
group by 1,2

Run time travel queries

We can use time travel queries in SQL on Athena to analyze past data snapshots. For example:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

This query checks the Seattle Airport temperature data as of a specific time in the past. The timestamp clause lets us travel back without altering current data. Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff.

Optimize query speed with clustering

To improve query performance, you can perform clustering on Hudi tables using SQL in Spark for Athena:

%%sql
CALL run_clustering(table => 'hudidb.noaa_hudi', order => 'name')

Compact tables

Compaction is a table service employed by Hudi specifically in Merge On Read (MOR) tables to merge updates from row-based log files to the corresponding columnar-based base file periodically to produce a new version of the base file. Compaction is not applicable to Copy On Write (COW) tables and only applies to MOR tables. You can run the following query in Spark for Athena to perform compaction on MOR tables:

%%sql
CALL run_compaction(op => 'run', table => 'hudi_table_mor');

Drop the table and database

Run the following Spark SQL to remove the Hudi table you created and associated data from the Amazon S3 location:

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

Run the following Spark SQL to remove the database hudidb:

%%sql
DROP DATABASE hudidb

To learn about all the operations you can perform on Hudi tables using Spark for Athena, refer to SQL DDL and Procedures in the Hudi documentation.

Working with Linux foundation Delta Lake tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Delta Lake tables.

Set up a notebook session

In order to use Delta Lake in Spark for Athena, while creating or editing a session, select Linux Foundation Delta Lake by expanding the Apache Spark properties section.

This image shows the Delta Lake properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_delta.ipynb file to follow along.

Create a database and Delta Lake table

In this section, we create a database in the AWS Glue Data Catalog. Using following SQL, we can create a database called deltalakedb:

%%sql
CREATE DATABASE deltalakedb

Next, in the database deltalakedb, we create a Delta Lake table called noaa_delta pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

Insert data into the table

We use an INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO deltalakedb.noaa_delta select * from sparkblogdb.noaa_pq

You can also use CREATE TABLE AS SELECT to create a Delta Lake table and insert data from a source table in one query.

Query the Delta Lake table

Now that the data is inserted in the Delta Lake table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Delta lake table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_delta table:

%%sql
UPDATE deltalakedb.noaa_delta
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location, and the result should be the same as earlier:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'Sea-Tac'
group by 1,2

Compact data files

In Spark for Athena, you can run OPTIMIZE on the Delta Lake table, which will compact the small files into larger files, so the queries are not burdened by the small file overhead. To perform the compaction operation, run the following query:

%%sql
OPTIMIZE deltalakedb.noaa_delta

Refer to Optimizations in the Delta Lake documentation for different options available while running OPTIMIZE.

Remove files no longer referenced by a Delta Lake table

You can remove files stored in Amazon S3 that are no longer referenced by a Delta Lake table and are older than the retention threshold by running the VACCUM command on the table using Spark for Athena:

%%sql
VACUUM deltalakedb.noaa_delta

Refer to Remove files no longer referenced by a Delta table in the Delta Lake documentation for options available with VACUUM.

Drop the table and database

Run the following Spark SQL to remove the Delta Lake table you created:

%%sql
DROP TABLE deltalakedb.noaa_delta

Run the following Spark SQL to remove the database deltalakedb:

%%sql
DROP DATABASE deltalakedb

Running DROP TABLE DDL on the Delta Lake table and database deletes the metadata for these objects, but doesn’t automatically delete the data files in Amazon S3. You can run the following Python code in the notebook’s cell to delete the data from the S3 location:

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('<your-S3-bucket>')
bucket.objects.filter(Prefix="<prefix>/noaadelta/").delete()

To learn more about the SQL statements that you can run on a Delta Lake table using Spark for Athena, refer to the quickstart in the Delta Lake documentation.

Conclusion

This post demonstrated how to use Spark SQL in Athena notebooks to create databases and tables, insert and query data, and perform common operations like updates, compactions, and time travel on Hudi, Delta Lake, and Iceberg tables. Open table formats add ACID transactions, upserts, and deletes to data lakes, overcoming limitations of raw object storage. By removing the need to install separate connectors, Spark on Athena’s built-in integration reduces configuration steps and management overhead when using these popular frameworks for building reliable data lakes on Amazon S3. To learn more about selecting an open table format for your data lake workloads, refer to Choosing an open table format for your transactional data lake on AWS.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Design a data mesh on AWS that reflects the envisioned organization

Post Syndicated from Claudia Chitu original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-on-aws-that-reflects-the-envisioned-organization/

This post is written in collaboration with Claudia Chitu and Spyridon Dosis from ACAST.

Founded in 2014, Acast is the world’s leading independent podcast company, elevating podcast creators and podcast advertisers for the ultimate listening experience. By championing an independent and open ecosystem for podcasting, Acast aims to fuel podcasting with the tools and monetization needed to thrive.

The company uses AWS Cloud services to build data-driven products and scale engineering best practices. To ensure a sustainable data platform amid growth and profitability phases, their tech teams adopted a decentralized data mesh architecture.

In this post, we discuss how Acast overcame the challenge of coupled dependencies between teams working with data at scale by employing the concept of a data mesh.

The problem

With an accelerated growth and expansion, Acast encountered a challenge that resonates globally. Acast found itself with diverse business units and a vast amount of data generated across the organization. The existing monolith and centralized architecture was struggling to meet the growing demands of data consumers. Data engineers were finding it increasingly challenging to maintain and scale the data infrastructure, resulting in data access, data silos, and inefficiencies in data management. A key objective was to enhance the end-to-end user experience, starting from the business needs.

Acast needed to address these challenges in order to get to an operational scale, meaning a global maximum of the number of people that can independently operate and deliver value. In this case, Acast tried to tackle the challenge of this monolith structure and the high time to value for product teams, tech teams, end consumers. It’s worth mentioning that they also have other product and tech teams, including operational or business teams, without AWS accounts.

Acast has a variable number of product teams, continuously evolving by merging existing ones, splitting them, adding new people, or simply creating new teams. In the last 2 years, they have had between 10–20 teams, consisting of 4–10 people each. Each team owns at least two AWS accounts, up to 10 accounts, depending on the ownership. The majority of data produced by these accounts is used downstream for business intelligence (BI) purposes and in Amazon Athena, by hundreds of business users every day.

The solution Acast implemented is a data mesh, architected on AWS. The solution mirrors the organizational structure rather than an explicit architectural decision. As per the Inverse Conway Maneuver, Acast’s technology architecture displays isomorphism with the business architecture. In this case, the business users are enabled through the data mesh architecture to get faster time to insights and know directly who the domain specific owners are, speeding up collaboration. This will be further detailed when we discuss the AWS Identity and Access Management (IAM) roles used, because one of the roles is dedicated to the business group.

Parameters of success

Acast succeeded in bootstrapping and scaling a new team- and domain-oriented data product and its corresponding infrastructure and setup, resulting in less friction in gathering insights and happier users and consumers.

The success of the implementation meant assessing various aspects of the data infrastructure, data management, and business outcomes. They classified the metrics and indicators in the following categories:

  • Data usage – A clear understanding of who is consuming what data source, materialized with a mapping of consumers and producers. Discussions with users showed they were happier to have faster access to data in a simpler way, a more structured data organization, and a clear mapping of who the producer is. A lot of progress has been made to advance their data-driven culture (data literacy, data sharing, and collaboration across business units).
  • Data governance – With their service-level object stating when the data sources are available (among other details), teams know whom to notify and can do so in a shorter time when there is late data coming in or other issues with the data. With a data steward role in place, the ownership has been strengthened.
  • Data team productivity – Through engineering retrospectives, Acast found that their teams appreciate autonomy to make decisions regarding their data domains.
  • Cost and resource efficiency – This is an area where Acast observed a reduction in data duplication, and therefore cost reduction (in some accounts, removing the copy of data 100%), by reading data across accounts while enabling scaling.

Data mesh overview

A data mesh is a sociotechnical approach to build a decentralized data architecture by using a domain-oriented, self-serve design (in a software development perspective), and borrows Eric Evans’ theory of domain-driven design and Manuel Pais’ and Matthew Skelton’s theory of team topologies. It’s important to establish the context to understand what data mesh is because it sets the stage for the technical details that follow and can help you understand how the concepts discussed in this post fit into the broader framework of a data mesh.

To recap before diving deeper into Acast’s implementation, the data mesh concept is based on the following principles:

  • It’s domain driven, as opposed to pipelines as a first-class concern
  • It serves data as a product
  • It’s a good product that delights users (data is trustworthy, documentation is available, and it’s easily consumable)
  • It offers federated computational governance and decentralized ownership—a self-serve data platform

Domain-driven architecture

In Acast’s approach of owning the operational and analytical datasets, teams are structured with ownership based on domain, reading directly from the producer of the data, via an API or programmatically from Amazon S3 storage or using Athena as a SQL query engine. Some examples of Acast’s domains are presented in the following figure.

As illustrated in the preceding figure, some domains are loosely coupled to other domains’ operational or analytical endpoints, with a different ownership. Others might have stronger dependency, which is expected, for business (some podcasters can be also advertisers, creating sponsorship creatives and running campaigns for their own shows, or transacting ads using Acast’s software as a service).

Data as a product

Treating data as a product entails three key components: the data itself, the metadata, and the associated code and infrastructure. In this approach, teams responsible for generating data are referred to as producers. These producer teams possess in-depth knowledge about their consumers, understanding how their data product is utilized. Any changes planned by the data producers are communicated in advance to all consumers. This proactive notification ensures that downstream processes are not disrupted. By providing consumers with advance notice, they have sufficient time to prepare for and adapt to the upcoming changes, maintaining a smooth and uninterrupted workflow. The producers run a new version of the initial dataset in parallel, notify the consumers individually, and discuss with them their necessary timeframe to start consuming the new version. When all consumers are using the new version, the producers make the initial version unavailable.

Data schemas are inferred from the common agreed-upon format to share files between teams, which is Parquet in the case of Acast. Data can be shared in files, batched or stream events, and more. Each team has its own AWS account acting as an independent and autonomous entity with its own infrastructure. For orchestration, they use the AWS Cloud Development Kit (AWS CDK) for infrastructure as code (IaC) and AWS Glue Data Catalogs for metadata management. Users can also raise requests to producers to improve the way the data is presented or to enrich the data with new data points for generating a higher business value.

With each team owning an AWS account and a data catalog ID from Athena, it’s straightforward to see this through the lenses of a distributed data lake on top of Amazon S3, with a common catalog mapping all the catalogs from all the accounts.

At the same time, each team can also map other catalogs to their own account and use their own data, which they produce along with the data from other accounts. Unless it is sensitive data, the data can be accessed programmatically or from the AWS Management Console in a self-service manner without being dependent on the data infrastructure engineers. This is a domain-agnostic, shared way to self-serve data. The product discovery happens through the catalog registration. Using only a few standards commonly agreed upon and adopted across the company, for the purpose of interoperability, Acast addressed the fragmented silos and friction to exchange data or consume domain-agnostic data.

With this principle, teams get assurance that the data is secure, trustworthy, and accurate, and appropriate access controls are managed at each domain level. Moreover, on the central account, roles are defined for different types of permissions and access, using AWS IAM Identity Center permissions. All datasets are discoverable from a single central account. The following figure illustrates how it’s instrumented, where two IAM roles are assumed by two types of user (consumer) groups: one that has access to a limited dataset, which is restricted data, and one that has access to non-restricted data. There is also a way to assume any of these roles, for service accounts, such as those used by data processing jobs in Amazon Managed Workflows for Apache Airflow (Amazon MWAA), for example.

How Acast solved for high alignment and a loosely coupled architecture

The following diagram shows a conceptual architecture of how Acast’s teams are organizing data and collaborating with each other.

Acast used the Well-Architected Framework for the central account to improve its practice running analytical workloads in the cloud. Through the lenses of the tool, Acast was able to address better monitoring, cost optimization, performance, and security. It helped them understand the areas where they could improve their workloads and how to address common issues, with automated solutions, as well as how to measure the success, defining KPIs. It saved them time to get the learnings that otherwise would have been taking longer to find. Spyridon Dosis, Acast’s Information Security Officer, shares, “We are happy AWS is always ahead with releasing tools that enable the configuration, assessment, and review of multi-account setup. This is a big plus for us, working in a decentralized organization.” Spyridon also adds, “A very important concept we value is the AWS security defaults (e.g. default encryption for S3 buckets).”

In the architecture diagram, we can see that each team can be a data producer, except the team owning the central account, which serves as the central data platform, modeling the logic from multiple domains to paint the full business picture. All other teams can be data producers or data consumers. They can connect to the central account and discover datasets via the cross-account AWS Glue Data Catalog, analyze them in the Athena query editor or with Athena notebooks, or map the catalog to their own AWS account. Access to the central Athena catalog is implemented with IAM Identity Center, with roles for open data and restricted data access.

For non-sensitive data (open data), Acast uses a template where the datasets are by default open to the entire organization to read from, using a condition to provide the organization-assigned ID parameter, as shown in the following code snippet:

{
    "Version": "2012-10-17",
    "Statement": [
        
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
               "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"  
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalOrgID": "ORG-ID-NUMBER"
                }
            }
        }
    ]
}

When handling sensitive data like financials, the teams use a collaborative data steward model. The data steward works with the requester to evaluate access justification for the intended use case. Together, they determine appropriate access methods to meet the need while maintaining security. This could include IAM roles, service accounts, or specific AWS services. This approach enables business users outside the tech organization (which means they don’t have an AWS account) to independently access and analyze the information they need. By granting access through IAM policies on AWS Glue resources and S3 buckets, Acast provides self-serve capabilities while still governing delicate data through human review. The data steward role has been valuable for understanding use cases, assessing security risks, and ultimately facilitating access that accelerates the business through analytical insights.

For Acast’s use case, granular row- or column-level access controls weren’t needed, so the approach sufficed. However, other organizations may require more fine-grained governance over sensitive data fields. In those cases, solutions like AWS Lake Formation could implement permissions needed, while still providing a self-serve data access model. For more information, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue.

At the same time, teams can read from other producers directly, from Amazon S3 or via an API, keeping the dependency at minimum, which enhances the velocity of development and delivery. Therefore, an account can be a producer and a consumer in parallel. Each team is autonomous, and is accountable for their own tech stack.

Additional learnings

What did Acast learn? So far, we’ve discussed that the architectural design is an effect of the organizational structure. Because the tech organization consists of multiple cross-functional teams, and it’s straightforward to bootstrap a new team, following the common principles of data mesh, Acast learned this doesn’t go seamlessly every time. To set up a fully new account in AWS, teams go through the same journey, but slightly different, considering their own set of particularities.

This can create certain frictions, and it’s difficult to get all data producing teams to reach a high maturity of being data producers. This can be explained by the different data competencies in those cross-functional teams and not being dedicated data teams.

By implementing the decentralized solution, Acast effectively tackled the scalability challenge by adapting their teams to align with evolving business needs. This approach ensures high decoupling and alignment. Furthermore, they strengthened ownership, significantly reducing the time needed to identify and resolve issues because the upstream source is readily known and easily accessible with specified SLAs. The volume of data support inquiries has seen a reduction of over 50%, because business users are empowered to gain faster insights. Notably, they successfully eliminated tens of terabytes of redundant storage that were previously copied solely to fulfill downstream requests. This achievement was made possible through the implementation of cross-account reading, leading to the removal of associated development and maintenance costs for these pipelines.

Conclusion

Acast used the Inverse Conway Maneuver law and employed AWS services where each cross-functional product team has its own AWS account to build a data mesh architecture that allows scalability, high ownership, and self-service data consumption. This has been working well for the company, regarding how data ownership and operations were approached, to meet their engineering principles, resulting in having the data mesh as an effect rather than a deliberate intent. For other organizations, the desired data mesh might look different and the approach might have other learnings.

To conclude, a modern data architecture on AWS allows you to efficiently construct data products and data mesh infrastructure at a low cost without compromising on performance.

The following are some examples of AWS services you can use to design your desired data mesh on AWS:


About the Authors

Claudia Chitu is a Data strategist and an influential leader in the Analytics space. Focused on aligning data initiatives with the overall strategic goals of the organization, she employs data as a guiding force for long-term planning and sustainable growth.

Spyridon Dosis is an Information Security Professional in Acast. Spyridon supports the organization in designing, implementing and operating its services in a secure manner protecting the company and users’ data.

Srikant Das is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 13 years of experience in Big Data analytics and Data Engineering, where he enjoys building reliable, scalable, and efficient solutions. Outside of work, he enjoys traveling and blogging his experiences in social media.

Orchestrate Amazon EMR Serverless Spark jobs with Amazon MWAA, and data validation using Amazon Athena

Post Syndicated from Gaurav Parekh original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-spark-jobs-with-amazon-mwaa-and-data-validation-using-amazon-athena/

As data engineering becomes increasingly complex, organizations are looking for new ways to streamline their data processing workflows. Many data engineers today use Apache Airflow to build, schedule, and monitor their data pipelines.

However, as the volume of data grows, managing and scaling these pipelines can become a daunting task. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) can help simplify the process of building, running, and managing data pipelines. By providing Apache Airflow as a fully managed platform, Amazon MWAA allows data engineers to focus on building data workflows instead of worrying about infrastructure.

Today, businesses and organizations require cost-effective and efficient ways to process large amounts of data. Amazon EMR Serverless is a cost-effective and scalable solution for big data processing that can handle large volumes of data. The Amazon Provider in Apache Airflow comes with EMR Serverless operators and is already included in Amazon MWAA, making it easy for data engineers to build scalable and reliable data processing pipelines. You can use EMR Serverless to run Spark jobs on the data, and use Amazon MWAA to manage the workflows and dependencies between these jobs. This integration can also help reduce costs by automatically scaling the resources needed to process data.

Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. You can use standard SQL to interact with data. Athena, a serverless and interactive analytics service, makes this possible without the need to manage complex infrastructure.

In this post, we use Amazon MWAA, EMR Serverless, and Athena to build a complete end-to-end data processing pipeline.

Solution overview

The following diagram illustrates the solution architecture.

The workflow includes the following steps:

  1. Create an Amazon MWAA workflow that retrieves data from your input Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use EMR Serverless to process the data stored in Amazon S3. EMR Serverless automatically scales up or down based on the workload, so you don’t need to worry about provisioning or managing any infrastructure.
  3. Use EMR Serverless to transform the data using PySpark code and then store the transformed data back in your S3 bucket.
  4. Use Athena to create an external table based on the S3 dataset and run queries to analyze the transformed data. Athena uses the AWS Glue Data Catalog to store the table metadata.

Prerequisites

You should have the following prerequisites:

Data preparation

To illustrate using EMR Serverless jobs with Apache Spark via Amazon MWAA and data validation using Athena, we use the publicly available NYC taxi dataset. Download the following datasets to your local machine:

  • Green taxi and Yellow taxi trip records – Trip records for yellow and green taxis, which include information such as pick-up and drop-off dates and times, locations, trip distances, and payment types. In our example, we use the latest Parquet files for 2022.
  • Dataset for Taxi zone lookup – A dataset that provides location IDs and corresponding zone details for taxis.

In later steps, we upload these datasets to Amazon S3.

Create solution resources

This section outlines the steps for setting up data processing and transformation.

Create an EMR Serverless application

You can create one or more EMR Serverless applications that use open source analytics frameworks like Apache Spark or Apache Hive. Unlike EMR on EC2, you do not need to delete or terminate EMR Serverless applications. EMR Serverless application is only a definition and once created, can be re-used as long as needed. This makes the MWAA pipeline simpler as now you just have to submit jobs to a pre-created EMR Serverless application.

By default, EMR Serverless application will auto-start on job submission and auto-stop when idle for 15 minutes by default to ensure cost efficiency. You can modify the amount of idle time or choose to turn the feature off.

To create an application using EMR Serverless console, follow the instructions in “Create an EMR Serverless application”. Note down the application ID as we will use it in following steps.

Create an S3 bucket and folders

Complete the following steps to set up your S3 bucket and folders:

  1. On the Amazon S3 console, create an S3 bucket to store the dataset.
  2. Note the name of the S3 bucket to use in later steps.
  3. Create an input_data folder for storing input data.
  4. Within that folder, create three separate folders, one for each dataset: green, yellow, and zone_lookup.

You can download and work with the latest datasets available. For our testing, we use the following files:

  • The green/ folder has the file green_tripdata_2022-06.parquet
  • The yellow/ folder has the file yellow_tripdata_2022-06.parquet
  • The zone_lookup/ folder has the file taxi_zone_lookup.csv

Set up the Amazon MWAA DAG scripts

Complete the following steps to set up your DAG scripts:

  1. Download the following scripts to your local machine:
    1. requirements.txt – A Python dependency is any package or distribution that is not included in the Apache Airflow base install for your Apache Airflow version on your Amazon MWAA environment. For this post, we use Boto3 version >=1.23.9.
    2. blog_dag_mwaa_emrs_ny_taxi.py – This script is a part of the Amazon MWAA DAG and consists of the following tasks: yellow_taxi_zone_lookup, green_taxi_zone_lookup, and ny_taxi_summary,. These tasks involve running Spark jobs to lookup taxi zones, and generating a data summary .
    3. green_zone.py – This PySpark script reads data files for green taxi rides and zone lookup, performs a join operation to combine them, and generates an output file containing green taxi rides with zone information. It utilizes temporary views for the df_green and df_zone data frames, performs column-based joins, and aggregates data like passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_green_zone, as Parquet files.
    4. yellow_zone.py – This PySpark script processes yellow taxi ride and zone lookup data files by joining them to generate an output file containing yellow taxi rides with zone information. The script accepts a user-provided S3 bucket name and initiates a Spark session with the application name yellow_zone. It reads the yellow taxi files and zone lookup file from the specified S3 bucket, creates temporary views, performs a join based on location ID, and calculates statistics such as passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_yellow_zone, as Parquet files.
    5. ny_taxi_summary.py – This PySpark script processes the green_zone and yellow_zone files to aggregate statistics on taxi rides, grouping data by service zones and location IDs. It requires an S3 bucket name as a command line argument, creates a SparkSession named ny_taxi_summary, reads the files from S3, performs a join, and generates a new data frame named ny_taxi_summary. It creates an output_data folder in the specified S3 bucket to write the resulting data frame to new Parquet files.
  2. On your local machine, update the blog_dag_mwaa_emrs_ny_taxi.py script with the following information:
    • Update your S3 bucket name in the following two lines:
      S3_LOGS_BUCKET = "<<bucket_name_here>>"
      S3_BASE_BUCKET = "<<bucket_name_here>>"

    • Update your role name ARN:
      JOB_ROLE_ARN = “<<emr_serverless_execution_role ARN here>>”
      e.g. arn:aws:iam::<<ACCOUNT_ID>>:role/<<ROLE_NAME>>

    • Update EMR Serverless Application ID. Use the Application ID created earlier.
      EMR_SERVERLESS_APPLICATION_ID  = “<<emr serverless application ID here>>

  3. Upload the requirements.txt file to the S3 bucket created earlier
  4. In the S3 bucket, create a folder named dags and upload the updated blog_dag_mwaa_emrs_ny_taxi.py file from your local machine.
  5. On the Amazon S3 console, create a new folder named scripts inside the S3 bucket and upload the scripts to this folder from your local machine.

Create an Amazon MWAA environment

To create an Airflow environment, complete the following steps:

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter mwaa_emrs_athena_pipeline.
  3. For Airflow version, choose the latest version (for this post, 2.5.1).
  4. For S3 Bucket, enter the path to your S3 bucket.
  5. For DAGs folder, enter the path to your dags folder.
  6. For Requirements file, enter the path to the requirements.txt file.
  7. Choose Next.
  8. For Virtual private cloud (VPC), choose a VPC that has a minimum of two private subnets.

This will populate two of the private subnets in your VPC.

  1. Under Web server access, select Public network.

This allows the Apache Airflow UI to be accessed over the internet by users granted access to the IAM policy for your environment.

  1. For Security group(s), select Create new security group.
  2. For Environment class, select mw1.small.
  3. For Execution role, choose Create a new role.
  4. For Role name, enter a name.
  5. Leave the other configurations as default and choose Next.
  6. On the next page, choose Create environment.

It may take about 20–30 minutes to create your Amazon MWAA environment.

  1. When the Amazon MWAA environment status changes to Available, navigate to the IAM console and update cluster execution role to add pass role privileges to emr_serverless_execution_role.

Trigger the Amazon MWAA DAG

To trigger the DAG, complete the following steps:

  1. On the Amazon MWAA console, choose Environments in the navigation pane.
  2. Open your environment and choose Open Airflow UI.
  3. Select blog_dag_mwaa_emr_ny_taxi, choose the play icon, and choose Trigger DAG.
  4. When the DAG is running, choose the DAG blog_dag_mwaa_emrs_ny_taxi and choose Graph to locate your DAG run workflow.

The DAG will take approximately 4–6 minutes to run all the scripts. You will see all the complete tasks and the overall status of the DAG will show as success.

To rerun the DAG, remove s3://<<your_s3_bucket here >>/output_data/.

Optionally, to understand how Amazon MWAA runs these tasks, choose the task you want to inspect.

Choose Run to view the task run details.

The following screenshot shows an example of the task logs.

If you like to dive deep in the execution logs, then on the EMR Serverless console, navigate to “Applications”. The Apache Spark driver logs will indicate the initiation of your job along with the details for executors, stages and tasks that were created by EMR Serverless. These logs can be helpful to monitor your job progress and troubleshoot failures.

By default, EMR Serverless will store application logs securely in Amazon EMR managed storage for a period of 30 days. However, you can also specify Amazon S3 or Amazon CloudWatch as your log delivery options during job submission.

Validate the final result set with Athena

Let’s validate the data loaded by the process using Athena SQL queries.

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (<S3_BUCKET_NAME>/athena), then choose Save.
  3. In the query editor, enter the following query to create an external table:
CREATE EXTERNAL TABLE default.ny_taxi_summary(
  pu_service_zone string, 
  pulocationid bigint, 
  do_service_zone string, 
  dolocationid bigint, 
  passenger_count bigint, 
  trip_distance double, 
  fare_amount double, 
  extra double, 
  mta_tax double, 
  tip_amount double, 
  tolls_amount double, 
  improvement_surcharge double, 
  total_amount double, 
  congestion_surcharge double, 
  airport_fee double)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<<YOUR-S3-BUCKET Here>>/output_data/ny_taxi_summary/' -- *** Change bucket name to your bucket***
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none');


Run the following query on the recently created ny_taxi_summary table to retrieve the first 10 rows to validate the data:

select * from default.ny_taxi_summary limit 10;

Clean up

To prevent future charges, complete the following steps:

  1. On the Amazon S3 console, delete the S3 bucket you created to store the Amazon MWAA DAG, scripts, and logs.
  2. On the Athena console, drop the table you created:
    drop table default.ny_taxi_summary;

  3. On the Amazon MWAA console, navigate to the environment that you created and choose Delete.
  4. On the EMR Studio console, delete the application.

To delete the application, navigate to the List applications page. Select the application that you created and choose Actions → Stop to stop the application. After the application is in the STOPPED state, select the same application and choose Actions → Delete.

Conclusion

Data engineering is a critical component of many organizations, and as data volumes continue to grow, it’s essential to find ways to streamline data processing workflows. The combination of Amazon MWAA, EMR Serverless, and Athena provides a powerful solution to build, run, and manage data pipelines efficiently. With this end-to-end data processing pipeline, data engineers can easily process and analyze large amounts of data quickly and cost-effectively without the need to manage complex infrastructure. The integration of these AWS services provides a robust and scalable solution for data processing, helping organizations make informed decisions based on their data insights.

Now that you’ve seen how to submit Spark jobs on EMR Serverless via Amazon MWAA, we encourage you to use Amazon MWAA to create a workflow that will run PySpark jobs via EMR Serverless.

We welcome your feedback and inquiries. Please feel free to reach out to us if you have any questions or comments.


About the authors

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Gaurav Parekh is a Solutions Architect helping AWS customers build large scale modern architecture. He specializes in data analytics and networking. Outside of work, Gaurav enjoys playing cricket, soccer and volleyball.


Audit History

December 2023: This post was reviewed for technical accuracy by Santosh Gantaram, Sr. Technical Account Manager.

Enhance query performance using AWS Glue Data Catalog column-level statistics

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/enhance-query-performance-using-aws-glue-data-catalog-column-level-statistics/

Today, we’re making available a new capability of AWS Glue Data Catalog that allows generating column-level statistics for AWS Glue tables. These statistics are now integrated with the cost-based optimizers (CBO) of Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential cost savings.

Data lakes are designed for storing vast amounts of raw, unstructured, or semi-structured data at a low cost, and organizations share those datasets across multiple departments and teams. The queries on these large datasets read vast amounts of data and can perform complex join operations on multiple datasets. When talking with our customers, we learned that one the challenging aspect of data lake performance is how to optimize these analytics queries to execute faster.

The data lake performance optimization is especially important for queries with multiple joins and that is where cost-based optimizers helps the most. In order for CBO to work, column statistics need to be collected and updated based on changes in the data. We’re launching capability of generating column-level statistics such as number of distinct, number of nulls, max, and min on files such as Parquet, ORC, JSON, Amazon ION, CSV, XML on AWS Glue tables. With this launch, customers now have integrated end-to-end experience where statistics on Glue tables are collected and stored in the AWS Glue Catalog, and made available to analytics services for improved query planning and execution.

Using these statistics, cost-based optimizers improves query run plans and boosts the performance of queries run in Amazon Athena and Amazon Redshift Spectrum. For example, CBO can use column statistics such as number of distinct values and number of nulls to improve row prediction. Row prediction is the number of rows from a table that will be returned by a certain step during the query planning stage. The more accurate the row predictions are, the more efficient query execution steps are. This leads to faster query execution and potentially reduced cost. Some of the specific optimizations that CBO can employ include join reordering and push-down of aggregations based on the statistics available for each table and column.

For customers using data mesh with AWS Lake Formation permissions, tables from different data producers are cataloged in the centralized governance accounts. As they generate statistics on tables on centralized catalog and share those tables with consumers, queries on those tables in consumer accounts will see query performance improvements automatically. In this post, we’ll demonstrate the capability of AWS Glue Data Catalog to generate column statistics for our sample tables.

Solution overview

To demonstrate the effectiveness of this capability, we employ the industry-standard TPC-DS 3 TB dataset stored in an Amazon Simple Storage Service (Amazon S3) public bucket. We’ll compare the query performance before and after generating column statistics for the tables, by running queries in Amazon Athena and Amazon Redshift Spectrum. We are providing queries that we used in this post and we encourage to try out your own queries following workflow as illustrated in the following details.

The workflow consists of the following high level steps:

  1. Cataloging the Amazon S3 Bucket: Utilize AWS Glue Crawler to crawl the designated Amazon S3 bucket, extracting metadata, and seamlessly storing it in the AWS Glue data catalog. We’ll query these tables using Amazon Athena and Amazon Redshift Spectrum.
  2. Generating column statistics: Employ the enhanced capabilities of AWS Glue Data Catalog to generate comprehensive column statistics for the crawled data, thereby providing valuable insights into the dataset.
  3. Querying with Amazon Athena and Amazon Redshift Spectrum: Evaluate the impact of column statistics on query performance by utilizing Amazon Athena and Amazon Redshift Spectrum to execute queries on the dataset.

The following diagram illustrates the solution architecture.

Walkthrough

To implement the solution, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Run AWS Glue Crawler on Public Amazon S3 bucket to list the 3TB TPC-DS dataset.
  3. Run queries on Amazon Athena and Amazon Redshift and note down query duration
  4. Generate statistics for AWS Glue Data Catalog tables
  5. Run queries on Amazon Athena and Amazon Redshift and compare query duration with previous run
  6. Optional: Schedule AWS Glue column statistics jobs using AWS Lambda and the Amazon EventBridge Scheduler

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An Amazon Virtual Private Cloud (Amazon VPC), public subnet, private subnets and route tables.
  • An Amazon Redshift Serverless workgroup and namespace.
  • An AWS Glue crawler to crawl the public Amazon S3 bucket and create a table for the Glue Data Catalog for TPC-DS dataset
  • AWS Glue catalog databases and tables
  • An Amazon S3 bucket to store athena result.
  • AWS Identity and Access Management (AWS IAM) users and policies.
  • AWS Lambda and Amazon Event Bridge scheduler to schedule the AWS Glue Column statistics

To launch the AWS CloudFormation stack, complete the following steps:

Note: The AWS Glue data catalog tables are generated using the public bucket s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/, hosted in the us-east-1 region. If you intend to deploy this AWS CloudFormation template in a different region, it is necessary to either copy the data to the corresponding region or share the data within your deployed region for it to be accessible from Amazon Redshift.

  1. Log in to the AWS Management Console as AWS Identity and Access Management (AWS IAM) administrator.
  2. Choose Launch Stack to deploy a AWS CloudFormation template.
  3. Choose Next.
  4. On the next page, keep all the option as default or make appropriate changes based on your requirement choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Run the AWS Glue Crawlers created by the AWS CloudFormation stack

To run your crawlers, complete the following steps:

  1. On the AWS Glue console to AWS Glue Console, choose Crawlers under Data Catalog in the navigation pane.
  2. Locate and run two crawlers tpcdsdb-without-stats and tpcdsdb-with-stats. It may take few mins to complete.

Once the crawler completes successfully, it would create two identical databases tpcdsdbnostats and tpcdsdbwithstats. The tables in tpcdsdbnostats will have No Stats and we’ll use them as reference. We generate statistics on tables in tpcdsdbwithstats. Please verify that you have those two databases and underlying tables from the AWS Glue Console. The tpcdsdbnostats database will look like below. At this time there are no statistics generated on these tables.

Run provided query using Amazon Athena on no-stats tables

To run your query in Amazon Athena on tables without statistics, complete the following steps:

  1. Download the athena queries from here.
  2. On the Amazon Athena Console, choose the provided query one at a time for tables in database tpcdsdbnostats.
  3. Run the query and note down the Run time for each query.

Run provided query using Amazon Redshift Spectrum on no-stats tables

To run your query in Amazon Redshift, complete the following steps:

  1. Download the Amazon Redshift queries from here.
  2. On the Redshift query editor v2, execute the Redshift Query for tables without stats section from downloaded query.
  3. Run the query and note down the query execution of each query.

Generate statistics on AWS Glue Catalog tables

To generate statistics on AWS Glue Catalog tables, complete the following steps:

  1. Navigate to the AWS Glue Console and choose the databases under Data Catalog.
  2. Click on tpcdsdbwithstats database and it will list all the available tables.
  3. Select any of these tables (e.g., call_center).
  4. Go to Column statistics – new tab and choose Generate statistics.
  5. Keep the default option. Under Choose columns keep Table (All columns) and Under Row sampling options Keep All rows, Under IAM role choose AWSGluestats-blog and select Generate statistics.

You’ll be able to see status of the statistics generation run as shown in the following illustration:

After generate statistics on AWS Glue Catalog tables, you should be able to see detailed column statistics for that table:

Reiterate steps 2–5 to generate statistics for all necessary tables, such as catalog_sales, catalog_returns, warehouse, item, date_dim, store_sales, customer, customer_address, web_sales, time_dim, ship_mode, web_site, web_returns. Alternatively, you can follow the “Schedule AWS Glue Statistics Runs” section near the end of this blog to generate statistics for all tables. Once done, assess query performance for each query.

Run provided query using Athena Console on stats tables

  1. On the Amazon Athena console, execute the Athena Query for tables with stats section from downloaded query.
  2. Run and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 55%.

Athena query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 2 33.62 15.17 55%
Query 4 132.11 72.94 45%
Query 14 134.77 91.48 32%
Query 28 55.99 39.36 30%
Query 38 29.32 25.58 13%

Run the provided query using Amazon Redshift Spectrum on statistics tables

  1. On the Amazon Redshift query editor v2, execute the Redshift Query for tables with stats section from downloaded query.
  2. Run the query and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 89%.

Amazon Redshift Spectrum query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 40 124.156 13.12 89%
Query 60 29.52 16.97 42%
Query 66 18.914 16.39 13%
Query 95 308.806 200 35%
Query 99 20.064 16 20%

Schedule AWS Glue statistics Runs

In this segment of the post, we’ll guide you through the steps of scheduling AWS Glue column statistics runs using AWS Lambda and the Amazon EventBridge Scheduler. To streamline this process, a AWS Lambda function and an Amazon EventBridge scheduler were created as part of the CloudFormation stack deployment.

  1. AWS Lambda function setup:

To begin, we utilize an AWS Lambda function to trigger the execution of the AWS Glue column statistics job. The AWS Lambda function invokes the start_column_statistics_task_run API through the boto3 (AWS SDK for Python) library. This sets the groundwork for automating the column statistics update.

Let’s explore the AWS Lambda function:

    • Go to the AWS Glue Lambda Console.
    • Select Functions and locate the GlueTableStatisticsFunctionv1.
    • For a clearer understanding of the AWS Lambda function, we recommend reviewing the code in the Code section and examining the environment variables under Configuration.
  1. Amazon EventBridge scheduler configuration

The next step involves scheduling the AWS Lambda function invocation using the Amazon EventBridge Scheduler. The scheduler is configured to trigger the AWS Lambda function daily at a specific time – in this case, 08:00 PM. This ensures that the AWS Glue column statistics job runs on a regular and predictable basis.

Now, let’s explore how you can update the schedule:

Cleaning up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign into the AWS CloudFormation console as the AWS IAM administrator used for creating the AWS CloudFormation stack.
  2. Delete the AWS CloudFormation stack you created.

Conclusion

In this post, we showed you how you can use AWS Glue Data Catalog to generate column-level statistics for AWS Glue tables. These statistics are now integrated with cost-based optimizer from Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential costs savings. Refer to Docs for support for Glue Catalog Statistics across various AWS analytical services.

If you have questions or suggestions, submit them in the comments section.


About the Authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Navnit Shukla serves as an AWS Specialist Solution Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled Data Wrangling on AWS. He can be reached via LinkedIn.

AWS Weekly Roundup – EC2 DL2q instances, PartyRock, Amplify’s 6th birthday, and more – November 20, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-ec2-dl2q-instances-partyrock-amplifys-6th-birthday-and-more-november-20-2023/

Last week I saw an astonishing 160+ new service launches. There were so many updates that we decided to publish a weekly roundup again. This continues the same innovative pace of the previous week as we are getting closer to AWS re:Invent 2023.

Our News Blog team is also finalizing new blog posts for re:Invent to introduce awesome launches with service teams for your reading pleasure. Jeff Barr shared The Road to AWS re:Invent 2023 to explain our blogging journey and process. Please stay tuned in the next week!

Last week’s launches
Here are some of the launches that caught my attention last week:

Amazon EC2 DL2q instances – New DL2q instances are powered by Qualcomm AI 100 Standard accelerators and are the first to feature Qualcomm’s AI technology in the public cloud. With eight Qualcomm AI 100 Standard accelerators and 128 GiB of total accelerator memory, you can run popular generative artificial intelligence (AI) applications and extend to edge devices across smartphones, autonomous driving, personal compute, and extended reality headsets to develop and validate these AI workloads before deploying.

PartyRock for Amazon Bedrock – We introduced PartyRock, a fun and intuitive hands-on, generative AI app-building playground powered by Amazon Bedrock. You can experiment, learn all about prompt engineering, build mini-apps, and share them with your friends—all without writing any code or creating an AWS account.

You also can now access the Meta Llama 2 Chat 13B foundation model and Cohere Command Light, Embed English, and multilingual models for Amazon Bedrock.

AWS Amplify celebrates its sixth birthday – We announced six new launches; a new documentation site, support for Next.js 14 with our hosting and JavaScript library, added custom token providers and an automatic React Native social sign-in update to Amplify Auth, new ChangePassword and DeleteUser account settings components, and updated all Amplify UI packages to use new Amplify JavaScript v6. You can also use wildcard subdomains when using a custom domain with your Amplify application deployed to AWS Amplify Hosting.

Amplify docs site UI

Also check out other News Blog posts about major launches published in the past week:

Other AWS service launches
Here are some other bundled feature launches per AWS service:

Amazon Athena  – You can use a new cost-based optimizer (CBO) to enhance query performance based on table and column statistics, collected by AWS Glue Data Catalog and Athena JDBC 3.x driver, a new alternative that supports almost all authentication plugins. You can also use Amazon EMR Studio to develop and run interactive queries on Amazon Athena.

Amazon CloudWatch – You can use a new CloudWatch metric called EBS Stalled I/O Check to monitor the health of your Amazon EBS volumes, the regular expression for Amazon CloudWatch Logs Live Tail filter pattern syntax to search and match relevant log events, observability of SAP Sybase ASE database in CloudWatch Application Insights, and up to two stats commands in a Log Insights query to perform aggregations on the results.

Amazon CodeCatalyst – You can connect to a Amazon Virtual Private Cloud (Amazon VPC) from CodeCatalyst Workflows, provision infrastructure using Terraform within CodeCatalyst Workflows, access CodeCatalyst with your workforce identities configured in IAM Identity Center, and create teams made up of members of the CodeCatalyst space.

Amazon Connect – You can use a pre-built queue performance dashboard and Contact Lens conversational analytics dashboard to view and compare real-time and historical aggregated queue performance. You can use quick responses for chats, previously written formats such as typing in ‘/#greet’ to insert a personalized response, and scanning attachments to detect malware or other unwanted content.

AWS Glue – AWS Glue for Apache Spark added new six database connectors: Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB, as well as the native connectivity to Amazon OpenSearch Service.

AWS Lambda – You can see single pane view of metrics, logs, and traces in the AWS Lambda console and advanced logging controls to natively capture logs in JSON structured format. You can view the SAM template on the Lambda console and export the function’s configuration to AWS Application Composer. AWS Lambda also supports Java 21 and NodeJS 20 versions built on the new Amazon Linux 2023 runtime.

AWS Local Zones in Dallas – You can enable the new Local Zone in Dallas, Texas, us-east-1-dfw-2a, with Amazon EC2 C6i, M6i, R6i, C6gn, and M6g instances and Amazon EBS volume types gp2, gp3, io1, sc1, and st1. You can also access Amazon ECS, Amazon EKS, Application Load Balancer, and AWS Direct Connect in this new Local Zone to support a broad set of workloads at the edge.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) – You can standardize access control to Kafka resources using AWS Identity and Access Management (IAM) and build Kafka clients for Amazon MSK Serverless written in all programming languages. These are open source client helper libraries and code samples for popular languages, including Java, Python, Go, and JavaScript. Also, Amazon MSK now supports an enhanced version of Apache Kafka 3.6.0 that offers generally available Tiered Storage and automatically sends you storage capacity alerts when you are at risk of exhausting your storage.

Amazon OpenSearch Service Ingestion – You can migrate your data from Elasticsearch version 7.x clusters to the latest versions of Amazon OpenSearch Service and use persistent buffering to protect the durability of incoming data.

Amazon RDS –Amazon RDS for MySQL now supports creating active-active clusters using the Group Replication plugin, upgrading MySQL 5.7 snapshots to MySQL 8.0, and Innovation Release version of MySQL 8.1.

Amazon RDS Custom for SQL Server extends point-in-time recovery support for up to 1,000 databases, supports Service Master Key Retention to use transparent data encryption (TDE), table- and column-level encryption, DBMail and linked servers, and use SQL Server Developer edition with the bring your own media (BYOM).

Additionally, Amazon RDS Multi-AZ deployments with two readable standbys now supports minor version upgrades and system maintenance updates with typically less than one second of downtime when using Amazon RDS Proxy.

AWS Partner Central – You can use an improved user experience in AWS Partner Central to build and promote your offerings and the new Investments tab in the Partner Analytics Dashboard to gain actionable insights. You can now link accounts and associated users between Partner Central and AWS Marketplace and use an enhanced co-sell experience with APN Customer Engagements (ACE) manager.

Amazon QuickSight – You can programmatically manage user access and custom permissions support for roles to restrict QuickSight functionality to the QuickSight account for IAM Identity Center and Active Directory using APIs. You can also use shared restricted folders, a Contributor role and support for data source asset types in folders and the Custom Week Start feature, an addition designed to enhance the data analysis experience for customers across diverse industries and social contexts.

AWS Trusted Advisor – You can use new APIs to programmatically access Trusted Advisor best practices checks, recommendations, and prioritized recommendations and 37 new Amazon RDS checks that provide best practices guidance by analyzing DB instance configuration, usage, and performance data.

There’s a lot more launch news that I haven’t covered. See AWS What’s New for more details.

See you virtually in AWS re:Invent
AWS re:Invent 2023Next week we’ll hear the latest from AWS, learn from experts, and connect with the global cloud community in Las Vegas. If you come, check out the agenda, session catalog, and attendee guides before your departure.

If you’re not able to attend re:Invent in person this year, we’re offering the option to livestream our Keynotes and Innovation Talks. With the registration for online pass, you will have access to on-demand keynote, Innovation Talks, and selected breakout sessions after the event.

Channy