Tag Archives: Amazon Athena

Accelerate data science feature engineering on transactional data lakes using Amazon Athena with Apache Iceberg

Post Syndicated from Vivek Gautam original https://aws.amazon.com/blogs/big-data/accelerate-data-science-feature-engineering-on-transactional-data-lakes-using-amazon-athena-with-apache-iceberg/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) and data sources residing in AWS, on-premises, or other cloud systems using SQL or Python. Athena is built on open-source Trino and Presto engines, and Apache Spark frameworks, with no provisioning or configuration effort required. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Apache Iceberg is an open table format for very large analytic datasets. It manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

Feature engineering is a process of identifying and transforming raw data (images, text files, videos, and so on), backfilling missing data, and adding one or more meaningful data elements to provide context so a machine learning (ML) model can learn from it. Data labeling is required for various use cases, including forecasting, computer vision, natural language processing, and speech recognition.

Combined with the capabilities of Athena, Apache Iceberg delivers a simplified workflow for data scientists to create new data features without needing to copy or recreate the entire dataset. You can create features using standard SQL on Athena without using any other service for feature engineering. Data scientists can reduce the time spent preparing and copying datasets, and instead focus on data feature engineering, experimentation, and analyzing data at scale.

In this post, we review the benefits of using Athena with the Apache Iceberg open table format and how it simplifies common feature engineering tasks for data scientists. We demonstrate how Athena can convert an existing table in Apache Iceberg format, then add columns, delete columns, and modify the data in the table without recreating or copying the dataset, and use these capabilities to create new features on Apache Iceberg tables.

Solution overview

Data scientists are generally accustomed to working with large datasets. Datasets are usually stored in either JSON, CSV, ORC, or Apache Parquet format, or similar read-optimized formats for fast read performance. Data scientists often create new data features, and backfill such data features with aggregated and ancillary data. Historically, this task was accomplished by creating a view on top of the table with the underlying data in Apache Parquet format, where such columns and data were added at runtime or by creating a new table with additional columns. Although this workflow is well-suited for many use cases, it’s inefficient for large datasets, because data would need to be generated at runtime or datasets would need to be copied and transformed.

Athena has introduced ACID (Atomicity, Consistency, Isolation, Durability) transaction capabilities that add INSERT, UPDATE, DELETE, MERGE, and time travel operations built on Apache Iceberg tables. These capabilities enable data scientists to create new data features and drop existing data features on existing datasets without worrying about copying or transforming the dataset or abstracting it with a view. Data scientists can focus on feature engineering work and avoid copying and transforming the datasets.

The Athena Iceberg UPDATE operation writes Apache Iceberg position delete files and newly updated rows as data files in the same transaction. You can make record corrections via a single UPDATE statement.

With the release of Athena engine version 3, the capabilities for Apache Iceberg tables are enhanced with the support for operations such as CREATE TABLE AS SELECT (CTAS) and MERGE commands that streamline the lifecycle management of your Iceberg data. CTAS makes it fast and efficient to create tables from other formats such as Apache Paquet, and MERGE INTO conditional updates, deletes, or inserts rows into an Iceberg table. A single statement can combine update, delete, and insert actions.

Prerequisites

Set up an Athena workgroup with Athena engine version 3 to use CTAS and MERGE commands with an Apache Iceberg table. To upgrade your existing Athena engine to version 3 in your Athena workgroup, follow the instructions in Upgrade to Athena engine version 3 to increase query performance and access more analytics features or refer to Changing the engine version in the Athena console.

Dataset

For demonstration, we use an Apache Parquet table that contains several million records of randomly distributed fictitious sales data from the last several years stored in an S3 bucket. Download the dataset, unzip it to your local computer, and upload it to your S3 bucket. In this post, we uploaded our dataset to s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/.

The following table shows the layout for the table customer_orders.

Column Name Data Type Description
orderkey string Order number for the order
custkey string Customer identification number
orderstatus string Status of the order
totalprice string Total price of the order
orderdate string Date of the order
orderpriority string Priority of the order
clerk string Name of the clerk who processed the order
shippriority string Priority on the shipping
name string Customer name
address string Customer address
nationkey string Customer nation key
phone string Customer phone number
acctbal string Customer account balance
mktsegment string Customer market segment

Perform feature engineering

As a data scientist, we want to perform feature engineering on the customer orders data by adding calculated one year total purchases and one year average purchases for each customer in the existing dataset. For demonstration purposes, we created the customer_orders table in the sampledb database using Athena as shown in the following DDL command. (You can use any of your existing datasets and follow the steps mentioned in this post.) The customer_orders dataset was generated and stored in the S3 bucket location s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/ in Parquet format. This table is not an Apache Iceberg table.

CREATE EXTERNAL TABLE sampledb.customer_orders(
  `orderkey` string, 
  `custkey` string, 
  `orderstatus` string, 
  `totalprice` string, 
  `orderdate` string, 
  `orderpriority` string, 
  `clerk` string, 
  `shippriority` string, 
  `name` string, 
  `address` string, 
  `nationkey` string, 
  `phone` string, 
  `acctbal` string, 
  `mktsegment` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/'
TBLPROPERTIES (
  'classification'='parquet');

Validate the data in the table by running a query:

SELECT * 
from sampledb.customer_orders 
limit 10;

We want to add new features to this table to get a deeper understanding of customer sales, which can result in faster model training and more valuable insights. To add new features to the dataset, convert the customer_orders Athena table to Apache Iceberg table on Athena. Issue a CTAS query statement to create a new table with Apache Iceberg format from the customer_orders table. While doing so, a new feature is added to get the total purchase amount in the past year (max year of the dataset) by each customer.

In the following CTAS query, a new column named one_year_sales_aggregate with the default value as 0.0 of data type double is added and table_type is set to ICEBERG:

CREATE TABLE  sampledb.customers_orders_aggregate
WITH (table_type = 'ICEBERG',
   format = 'PARQUET', 
   location = 's3://sample-iceberg-datasets-xxxxxxxxxxxx/sampledb/customer_orders_aggregate', 
   is_external = false
   ) 
AS 
SELECT 
orderkey,
custkey,
orderstatus,
totalprice,
orderdate, 
orderpriority, 
clerk, 
shippriority, 
name, 
address, 
nationkey, 
phone, 
acctbal, 
mktsegment,
0.0 as one_year_sales_aggregate
from sampledb.customer_orders;

Issue the following query to verify the data in the Apache Iceberg table with the new column one_year_sales_aggregate values as 0.0:

SELECT custkey, totalprice, one_year_sales_aggregate 
from sampledb.customers_orders_aggregate 
limit 10;

We want to populate the values for the new feature one_year_sales_aggregate in the dataset to get the total purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the Apache Iceberg table using Athena to populate values for the one_year_sales_aggregate feature:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y ') as    orderdate, 
            sum(CAST(totalprice as double)) as one_year_sales_aggregate
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y ') = (select date_format(max(CAST(orderdate as date)), '%Y ') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y ')) sales_one_year_agg
    ON (coa.custkey = sales_one_year_agg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_aggregate = sales_one_year_agg.one_year_sales_aggregate;

Issue the following query to validate the updated value for total spend by each customer in the past year:

SELECT custkey, totalprice, one_year_sales_aggregate
from sampledb.customers_orders_aggregate limit 10;

We decide to add another feature onto an existing Apache Iceberg table to compute and store the average purchase amount in the past year by each customer. Issue an ALTER query statement to add a new column to an existing table for feature one_year_sales_average:

ALTER TABLE sampledb.customers_orders_aggregate
ADD COLUMNS (one_year_sales_average double);

Before populating the values to this new feature, you can set the default value for the feature one_year_sales_average to 0.0. Using the same Apache Iceberg table on Athena, issue an UPDATE query statement to populate the value for the new feature as 0.0:

UPDATE sampledb.customers_orders_aggregate
SET one_year_sales_average = 0.0;

Issue the following query to verify the updated value for average spend by each customer in the past year is set to 0.0:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Now we want to populate the values for the new feature one_year_sales_average in the dataset to get the average purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the existing Apache Iceberg table on Athena using the Athena engine to populate values for the feature one_year_sales_average:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y') as orderdate, 
            avg(CAST(totalprice as double)) as one_year_sales_average
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y') = (select date_format(max(CAST(orderdate as date)), '%Y') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y')) sales_one_year_avg
    ON (coa.custkey = sales_one_year_avg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_average = sales_one_year_avg.one_year_sales_average;

Issue the following query to verify the updated values for average spend by each customer:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Once additional data features have been added to the dataset, data scientists generally proceed to train ML models and make inferences using Amazon Sagemaker or equivalent toolset.

Conclusion

In this post, we demonstrated how to perform feature engineering using Athena with Apache Iceberg. We also demonstrated using the CTAS query to create an Apache Iceberg table on Athena from an existing dataset in Apache Parquet format, adding new features in an existing Apache Iceberg table on Athena using the ALTER query, and using UPDATE and MERGE query statements to update the feature values of existing columns.

We encourage you to use CTAS queries to create tables quickly and efficiently, and use the MERGE query statement to synchronize tables in one step to simplify data preparations and update tasks when transforming the features using Athena with Apache Iceberg. If you have comments or feedback, please leave them in the comments section.


About the Authors

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing modern data platforms, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Mikhail Vaynshteyn is a Solutions Architect with Amazon Web Services. Mikhail works with healthcare and life sciences customers to build solutions that help improve patients’ outcomes. Mikhail specializes in data analytics services.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Harsha Tadiparthi is a specialist Principal Solutions Architect, Analytics at AWS. He enjoys solving complex customer problems in databases and analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

How Cargotec uses metadata replication to enable cross-account data sharing

Post Syndicated from Sumesh M R original https://aws.amazon.com/blogs/big-data/how-cargotec-uses-metadata-replication-to-enable-cross-account-data-sharing/

This is a guest blog post co-written with Sumesh M R from Cargotec and Tero Karttunen from Knowit Finland.

Cargotec (Nasdaq Helsinki: CGCBV) is a Finnish company that specializes in cargo handling solutions and services. They are headquartered in Helsinki, Finland, and operates globally in over 100 countries. With its leading cargo handling solutions and services, they are pioneers in their field. Through their unique position in ports, at sea, and on roads, they optimize global cargo flows and create sustainable customer value.

Cargotec captures terabytes of IoT telemetry data from their machinery operated by numerous customers across the globe. This data needs to be ingested into a data lake, transformed, and made available for analytics, machine learning (ML), and visualization. For this, Cargotec built an Amazon Simple Storage Service (Amazon S3) data lake and cataloged the data assets in AWS Glue Data Catalog. They chose AWS Glue as their preferred data integration tool due to its serverless nature, low maintenance, ability to control compute resources in advance, and scale when needed.

In this blog, we discuss the technical challenges faced by Cargotec in replicating their AWS Glue metadata across AWS accounts, and how they navigated these challenges successfully to enable cross-account data sharing.  By sharing their story, we hope to inspire readers facing similar challenges and provide insights into how our services can be customized to meet your specific needs.

Challenges

Like many customers, Cargotec’s data lake is distributed across multiple AWS accounts that are owned by different teams. Cargotec wanted to find a solution to share datasets across accounts and use Amazon Athena to query them. To share the datasets, they needed a way to share access to the data and access to catalog metadata in the form of tables and views. Cargotec’s use cases also required them to create views that span tables and views across catalogs. Cargotec’s implementation covers three discrete AWS accounts, 25 databases, 150 tables, and 10 views.

Solution overview

Cargotec required a single catalog per account that contained metadata from their other AWS accounts. The solution that best fit their needs was to replicate metadata using an in-house version of a publicly available utility called Metastore Migration utility. Cargotec extended the utility by changing the overall orchestration layer by adding an Amazon SQS notification and an AWS Lambda. The approach was to programmatically copy and make available each catalog entity (databases, tables, and views) to all consumer accounts. This makes the tables or views local to the account where the query is being run, while the data still remains in its source S3 bucket.

Cargotec’s solution architecture

The following diagram summarizes the architecture and overall flow of events in Cargotec’s design.

Solution Architecture

Catalog entries from a source account are programmatically replicated to multiple target accounts using the following series of steps.

  1. An AWS Glue job (metadata exporter) runs daily on the source account. It reads the table and partition information from the source AWS Glue Data Catalog. Since the target account is used for analytical purposes and does not require real-time schema changes, the metadata exporter runs only once a day. Cargotec uses partition projection, which ensures that the new partitions are available in real-time.
  2. The job then writes the metadata to an S3 bucket in the same account. Please note that the solution doesn’t involve movement of the data across accounts. The target accounts read data from the source account S3 buckets. For guidance on setting up the right permissions, please see the Amazon Athena User Guide.
  3. After the metadata export has been completed, the AWS Glue job pushes a notification to an Amazon Simple Notification Service (Amazon SNS) topic. This message contains the S3 path to the latest metadata export. The SNS notification is Cargotec’s customization to the existing open-source utility.
  4. Every target account runs an AWS Lambda function that is notified when the source account SNS topic receives a push. In short, there are multiple subscriber Lambda functions (one per target account) for the source account SNS topics that get triggered when an export job is completed.
  5. Once triggered, the Lambda function then initiates an AWS Glue job (metadata importer) on the respective target account. The job receives as input the source account’s S3 path to the metadata that has been recently exported.
  6. Based on the path provided, the metadata importer reads the exported metadata from the source S3 bucket.
  7. The metadata importer now uses this information to create or update the corresponding catalog information in the target account.

All along the way, any errors are published to a separate SNS topic for logging and monitoring purposes. With this approach, Cargotec was able to create and consume views that span tables and views from multiple catalogs spread across different AWS accounts.

Implementation

The core of the catalog replication utility is two AWS Glue scripts:

  • Metadata exporter – An AWS Glue job that reads the source data catalog and creates an export of the databases, tables, and partitions in an S3 bucket in the source account.
  • Metadata importer – An AWS Glue job that reads the export that was created by the metadata exporter and applies the metadata to target databases. This code is triggered by a Lambda function once files are written to S3. The job runs in the target account.

Metadata exporter

This section provides details on the AWS Glue job that exports the AWS Glue Data Catalog into an S3 location. The source code for the application is hosted the AWS Glue GitHub. Though this may need to be customized to suit your needs, we will go over the core components of the code in this blog.

Metadata exporter inputs

The application takes a few job input parameters as described below:

  • --mode key accepts either to-s3 or to-jdbc. The latter is used when the code is moving the metadata directly into a JDBC Hive Metastore. In the case of Cargotec, since we are moving the metadata to files on S3, the value for --mode will remain to-s3.
  • --output-path accepts an S3 location to which the exported metadata should be written. The code creates subdirectories corresponding to databases, tables, and partitions.
  • --database-names accepts a semicolon-separated list of databases on the source catalog that need to be replicated to the target

Reading the catalog

The metadata about the database, tables, and partitions are read from the AWS Glue catalog.

dyf = glue_context.create_dynamic_frame.from_options(
 connection_type=’com.amazonaws.services.glue.connections.DataCatalogConnection‘,
            connection_options = {
                            'catalog.name': ‘datacatalog’,
                            'catalog.database': database,
                            'catalog.region': region
                                 })

The above code snippet reads the metadata into an AWS Glue DynamicFrame. The frame is then converted to a Spark DataFrame. It is filtered into individual DataFrames based on it being either part of a database, table, or partition. A schema is attached to the data frame using one of the below:

DATACATALOG_DATABASE_SCHEMA = 
    StructType([
        StructField('items', ArrayType(
            DATACATALOG_DATABASE_ITEM_SCHEMA, False),
                    True),
        StructField('type', StringType(), False)
    ])
DATACATALOG_TABLE_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('type', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_TABLE_ITEM_SCHEMA, False), True)
    ])
DATACATALOG_PARTITION_SCHEMA = 
    StructType([
        StructField('database', StringType(), False),
        StructField('table', StringType(), False),
        StructField('items', ArrayType(DATACATALOG_PARTITION_ITEM_SCHEMA, False), True),
        StructField('type', StringType(), False)
    ])

For details on the individual item schema, refer to the schema definition on GitHub.

Persisting the metadata

After converting to a DataFrame with schema, it is persisted to the S3 location marked by the output-path parameter

databases.write.format('json').mode('overwrite').save(output_path + 'databases')
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')

Exploring the output

Navigate to the S3 bucket that contains the output location, and you should be able to see the output metadata in format. An example export for a table would look like the following code snippet.

{
    "database": "default",
    "type": "table",
    "item": {
        "createTime": "1651241372000",
        "lastAccessTime": "0",
        "owner": "spark",
        "retention": 0,
        "name": "an_example_table",
        "tableType": "EXTERNAL_TABLE",
        "parameters": {
            "totalSize": "2734148",
            "EXTERNAL": "TRUE",
            "last_commit_time_sync": "20220429140907",
            "spark.sql.sources.schema.part.0": "{redacted_schema}",
            "numFiles": "1",
            "transient_lastDdlTime": "1651241371",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "hudi"
        },
        "partitionKeys": [],
        "storageDescriptor": {
            "inputFormat": "org.apache.hudi.hadoop.HoodieParquetInputFormat",
            "compressed": false,
            "storedAsSubDirectories": false,
            "location": "s3://redacted_bucket_name/table/an_example_table",
            "numberOfBuckets": -1,
            "outputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "bucketColumns": [],
            "columns": [{
                    "name": "_hoodie_commit_time",
                    "type": "string"
                },
                {
                    "name": "_hoodie_commit_seqno",
                    "type": "string"
                }
            ],
            "parameters": {},
            "serdeInfo": {
                "serializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                "parameters": {
                    "hoodie.query.as.ro.table": "false",
                    "path": "s3://redacted_bucket_name/table/an_example_table",
                    "serialization.format": "1"
                }
            },
            "skewedInfo": {
                "skewedColumnNames": [],
                "skewedColumnValueLocationMaps": {},
                "skewedColumnValues": []
            },
            "sortColumns": []
        }
    }
}

Once the export job is complete, the output S3 path will be pushed to an SNS topic. A Lambda function at the target account processes this message and invokes the import AWS Glue job by passing the S3 import location.

Metadata importer

The import job runs on the target account. The code for the job is available on GitHub. As with the exporter, you may need to customize it to suit your specific requirements, but the code as-is should work for most scenarios.

Metadata importer inputs

The inputs to the application are provided as job parameters. Below is a list of parameters that are used for the import process:

  • --mode key accepts either from-s3 or from-jdbc. The latter is used when migration is from a JDBC source to the AWS Glue Data Catalog. At Cargotec, the metadata is already written to Amazon S3, and hence the value for this key is always set to from-s3.
  • --region key accepts a valid AWS Region for the AWS Glue Catalog. The target Region is specified using this key.
  • --database-input-path key accepts the path to the file containing the database metadata. This is the output of the previous import job.
  • --table-input-path key accepts the path to the file containing the table metadata. This is the output of the previous import job.
  • --partition-input-path key accepts the path to the file containing the partition metadata. This is the output of the previous import job.

Reading the metadata

The metadata, as previously discussed, are files on Amazon S3. They are read into individual spark data frames with their respective schema information

databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)

Loading the catalog

Once the spark data frames are read, they are converted to AWS Glue DynamicFrame and then loaded to the catalog, as shown in the following snippet.

glue_context.write_dynamic_frame.from_options(
        frame=dyf_databases, 
        connection_type='catalog',
        connection_options={
               'catalog.name': datacatalog_name, 
               'catalog.region': region
         }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_tables, 
        connection_type='catalog',
        connection_options={
                'catalog.name': datacatalog_name, 
                'catalog.region': region
        }
)
glue_context.write_dynamic_frame.from_options(
        frame=dyf_partitions, 
        connection_type='catalog',
        connection_options={
                 'catalog.name': datacatalog_name, 
                 'catalog.region': region
         }
)

Once the job concludes, you can query the target AWS Glue catalog to ensure the tables from the source have been synced with the destination. To keep things simple and easy to manage, instead of implementing a mechanism to identify tables that change over time, Cargotec updates the catalog information of all databases or tables that are configured in the export job.

Considerations

Though the setup works effectively for Cargotec’s current business requirements, there are a few drawbacks to this approach, which are highlighted below:

  1. The solution involves code. Customizations were made to the existing open-source utility to be able to publish an SNS notification once an export is complete and a Lambda function to trigger the import process.
  2. The export process on the source account is a scheduled job. Hence there is no real-time sync between the source and target accounts. This was not a requirement for Cargotec’s business process.
  3. For tables that don’t use Athena partition projection, query results may be outdated until the new partitions are added to the metastore through MSCK REPAIR TABLE, ALTER TABLE ADD PARTITION, AWS Glue crawler, and so on.
  4. The current approach requires syncing all the tables across the source and target. If the requirement is to capture only the ones that changed instead of a scheduled daily export, the design needs to change and could benefit from the Amazon EventBridge integration with AWS Glue. An example implementation of using AWS Glue APIs to identify changes is shown in Identify source schema changes using AWS Glue.

Conclusion

In this blog post, we have explored a solution for cross-account sharing of data and tables that makes it possible for Cargotec to create views that combine data from multiple AWS accounts. We’re excited to share Cargotec’s success and believe the post has provided you with valuable insights and inspiration for your own projects.

We encourage you to explore our range of services and see how they can help you achieve your goals. Lastly, for more data and analytics blogs, feel free to bookmark the AWS Blogs.


About the Authors

Sumesh M R is a Full Stack Machine Learning Architect at Cargotec. He has several years of software engineering and ML background. Sumesh is an expert in Sagemaker and other AWS ML/Analytics services. He is passionate about data science and loves to explore the latest ML libraries and techniques. Before joining Cargotec, he worked as a Solution Architect at TCS. In his spare time, he loves to play cricket and badminton.

 Tero Karttunen is a Senior Cloud Architect at Knowit Finland. He advises clients on architecting and adopting Data Architectures that best serve their Data Analytics and Machine Learning needs. He has helped Cargotec in their data journey for more than two years. Outside of work, he enjoys running, winter sports, and role-playing games.

Arun A K is a Big Data Specialist Solutions Architect at AWS.  He works with customers to provide architectural guidance for running analytics solutions on AWS Glue, AWS Lake Formation, Amazon Athena, and Amazon EMR. In his free time, he likes to spend time with his friends and family.

Introducing Athena Provisioned Capacity

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/introducing-athena-provisioned-capacity/

Today we launch the ability to provision capacity to run your Amazon Athena queries.

Athena is a query service that makes it simple to analyze data in Amazon Simple Storage Service (Amazon S3) data lakes and 30 different data sources, including on-premises data sources or other cloud systems, using standard SQL queries. Athena is serverless, so there is no infrastructure to manage, and–until today–you pay only for the queries that you run. Starting today, you can get dedicated capacity for your queries and use new workload management features to prioritize, control, and scale your most important queries, paying only for the capacity you provision.

At AWS, 90 percent of the new services and features are driven by your direct feedback. Many of you Athena customers told us that, when running a large volume of queries, you sometimes experience queuing, which might slow down some applications or business processes. To work around this, you typically create a query prioritization mechanism to prioritize mission-critical queries over less critical, interactive, or exploratory queries. This prioritization mechanism helps to get the highest priority queries run first, at the price of building and maintaining code or business processes outside of Athena itself. You also told us it is difficult to forecast your Athena costs. Athena charges by the volume of data scanned, which is often difficult to predict as it depends on the size of your data set, the construction of the user queries, and the storage format for the data.

We heard this feedback, and today, we introduce the capability to provision dedicated query processing capacity at scale. With provisioned capacity, you provision a dedicated set of compute resources to run your queries. This always-on capacity can serve your business-critical queries with near-zero latency and no queuing. It gives you control over workload performance characteristics such as cost, concurrency, and query prioritization. Similar to provisioned capacity for other AWS services, you pay only for the capacity provisioned, not for the actual usage. With provisioned capacity, your Athena bills are predictable, and you do not have to limit user queries to stay within your monthly budget. I’ll share more about the billing model down below.

Behind the scenes, Athena maintains a large pool of compute in each AWS Region that it operates in. You can think of this as one large pool of compute, divided logically across customers. When you reserve capacity in Athena, the capacity is held for your exclusive use. You can choose which queries run on the capacity you provisioned and which run on Athena’s multi-tenant, on-demand capacity. Multiple queries can share the capacity you provisioned. You may add additional capacity units at any time, based on your evolving business requirements. You may also adjust the provisioned capacity down after a minimum period of time of 8 hours.

The unit of capacity is a Data Processing Unit (DPU). A single DPU is equivalent to four vCPU and 16 Gb RAM. The minimum capacity you may provision is 24 DPU for 8 hours. This new provisioned capacity for Athena is ideal for those of you running any volume of queries, but the sweet spot to start using provisioned capacity is when you spend $100 or more per month on Athena.

The number of DPUs you need depends on your goals and analysis patterns. For example, if you need queries to start immediately and without queuing, you should provision enough DPUs to meet your peak concurrent query demand. Provisioning fewer DPUs than your peak demand is allowed, but may result in queuing. When this occurs, queries are held in a queue and executed when capacity is available. If your goal is to run queries within a fixed budget, you can use the AWS Pricing Calculator to determine the number of DPUs that meets your budget. Lastly, remember that data size, storage format, and query construction influence the number of DPU a query requires. You can increase query performance by compressing, partitioning, and converting your data into columnar formats. Athena’s documentation provides you with guidelines to determine how much capacity you might require to run multiple queries at the same time.

How Does It Work?
Getting started is a three-step process. I navigate to the Athena page in the AWS Management Console and select Capacity Reservations on the left-side navigation menu.
(The console you see on this demo is based on the new Cloudscape open-source design system, yours might still see the traditional design on your AWS account.)

Athena Capacity Reservation landing page in the console

I select the Create capacity reservation button at the top right of the page.

On the Create capacity reservation page, I enter a Capacity reservation name and the number of DPUs I want to provision.

Athena Capacity Reservation - Create Reservation

I select Review to review my choices, and I select Create capacity reservation to create my reservation. After a brief period of time, the capacity reservation status becomes ✅ Active.

Athena Capacity Reservation - Status

The third and last step is to create a workgroup and assign the workgroup to the provisioned capacity. A workgroup is an Athena mechanism allowing you to separate users, teams, applications, or workloads to set limits on the amount of data each query or the entire workgroup can process and to track costs.

Queries belonging to the assigned workgroup will run on the capacity you provisioned. Capacity may be shared with multiple workgroups as long as they all use the same Athena engine version. This concept, depicted in the diagram below, is surfaced through a capacity allocation policy, which defines how capacity is assigned over workgroups. This gives you the flexibility to run queries with more or less capacity, depending on your business needs.

Athena Capacity Reservation - shared workgroups

To create a workgroup, I navigate to the Workgroups section of the Athena page. Then, I select Create workgroup.

Athena Capacity Reservation - Create Workgroup

I make sure the analytics engine selected in the reservation matches the one in the workgroup.

Athena Capacity Reservation - select analytic engineThen, I go back to the capacity reservation I just created, and I select Add workgroups to add the workgroup I just created.

Athena Capacity Reservation - Add workgroup

That’s it! Now that the configuration is ready, I can run my queries. Existing queries will run on the provisioned capacity unmodified. I make sure to select the workgroup I just created when I run queries. I choose a workgroup on the top right side of the query editor, or use the --work-group argument on the AWS command line, such as:

aws athena start-query-execution --work-group AWSNewsBlog

Athena Capacity Reservation - Select workgroup

Availability and Pricing
As I explained in the introduction, we charge for the number of DPUs you provisioned and the duration. The minimum duration is 8 hours, and after that, billing is per minute. You can release the provisioned capacity at any time. Cancellations within the minimum duration period are billed for the full term, and capacity is deallocated as soon as all currently running queries are terminated.

Queries run from a workgroup assigned to a provisioned capacity are not billed for the amount of data scanned. You effectively pay a flat rate depending on the provisioned capacity, not the usage. If you have excess capacity, you can reduce the number of DPUs you provisioned or add workgroups to consume the excess capacity.

As usual, the Athena pricing page has all the details.

Athena provisioned capacity is available today in US East (Ohio, N. Virginia), US West (Oregon), Asia Pacific (Singapore, Sydney, Tokyo), and Europe (Ireland, Stockholm) AWS Regions.

Go and provision your Athena capacity today!

— seb

How Novo Nordisk built distributed data governance and control at scale

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-distributed-data-governance-and-control-at-scale/

This is a guest post co-written with Jonatan Selsing and Moses Arthur from Novo Nordisk.

This is the second post of a three-part series detailing how Novo Nordisk, a large pharmaceutical enterprise, partnered with AWS Professional Services to build a scalable and secure data and analytics platform. The first post of this series describes the overall architecture and how Novo Nordisk built a decentralized data mesh architecture, including Amazon Athena as the data query engine. The third post will show how end-users can consume data from their tool of choice, without compromising data governance. This will include how to configure Okta, AWS Lake Formation, and a business intelligence tool to enable SAML-based federated use of Athena for an enterprise BI activity.

When building a scalable data architecture on AWS, giving autonomy and ownership to the data domains are crucial for the success of the platform. By providing the right mix of freedom and control to those people with the business domain knowledge, your business can maximize value from the data as quickly and effectively as possible. The challenge facing organizations, however, is how to provide the right balance between freedom and control. At the same time, data is a strategic asset that needs to be protected with the highest degree of rigor. How can organizations strike the right balance between freedom and control?

In this post, you will learn how to build decentralized governance with Lake Formation and AWS Identity and Access Management (IAM) using attribute-based access control (ABAC). We discuss some of the patterns we use, including Amazon Cognito identity pool federation using ABAC in permission policies, and Okta-based SAML federation with ABAC enforcement on role trust policies.

Solution overview

In the first post of this series, we explained how Novo Nordisk and AWS Professional Services built a modern data architecture based on data mesh tenets. This architecture enables data governance on distributed data domains, using an end-to-end solution to create data products and providing federated data access control. This post dives into three elements of the solution:

  • How IAM roles and Lake Formation are used to manage data access across data domains
  • How data access control is enforced at scale, using a group membership mapping with an ABAC pattern
  • How the system maintains state across the different layers, so that the ecosystem of trust is configured appropriately

From the end-user perspective, the objective of the mechanisms described in this post is to enable simplified data access from the different analytics services adopted by Novo Nordisk, such as those provided by software as a service (SaaS) vendors like Databricks, or self-hosted ones such as JupyterHub. At the same time, the platform must guarantee that any change in a dataset is immediately reflected at the service user interface. The following figure illustrates at a high level the expected behavior.

High-level data platform expected behavior

Following the layer nomenclature established in the first post, the services are created and managed in the consumption layer. The domain accounts are created and managed in the data management layer. Because changes can occur from both layers, continuous communication in both directions is required. The state information is kept in the virtualization layer along with the communication protocols. Additionally, at sign-in time, the services need information about data resources required to provide data access abstraction.

Managing data access

The data access control in this architecture is designed around the core principle that all access is encapsulated in isolated IAM role sessions. The layer pattern that we described in the first post ensures that the creation and curation of the IAM role policies involved can be delegated to the different data management ecosystems. Each data management platform integrated can use their own data access mechanisms, with the unique requirement that the data is accessed via specific IAM roles.

To illustrate the potential mechanisms that can be used by data management solutions, we show two examples of data access permission mechanisms used by two different data management solutions. Both systems utilize the same trust policies as described in the following sections, but have a completely different permission space.

Example 1: Identity-based ABAC policies

The first mechanism we discuss is an ABAC role that provides access to a home-like data storage area, where users can share within their departments and with the wider organization in a structure that mimics the organizational structure. Here, we don’t utilize the group names, but instead forward user attributes from the corporate Active Directory directly into the permission policy through claim overrides. We do this by having the corporate Active Directory as the identity provider (IdP) for the Amazon Cognito user pool and mapping the relevant IdP attributes to user pool attributes. Then, in the Amazon Cognito identity pool, we map the user pool attributes to session tags to use them for access control. Custom overrides can be included in the claim mapping, through the use of a pre token generation Lambda trigger. This way, claims from AD can be mapped to Amazon Cognito user pool attributes and then ultimately used in the Amazon Cognito identity pool to control IAM role permissions. The following is an example of an IAM policy with sessions tags:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "",
                        "public/",
                        "public/*",
                        "home/",
                        "home/${aws:PrincipalTag/initials}/*",
                        "home/${aws:PrincipalTag/department}/*"
                    ]
                }
            },
            "Action": "s3:ListBucket",
            "Resource": [
                "arn:aws:s3:::your-home-bucket"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:PutObject*",
                "s3:DeleteObject*"
            ],
            "Resource": [
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": "s3:GetObject*",
            "Resource": [
                "arn:aws:s3:::your-home-bucket/public/",
                "arn:aws:s3:::your-home-bucket/public/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This role is then embedded in the analytics layer (together with the data domain roles) and assumed on behalf of the user. This enables users to mix and match between data domains—as well as utilizing private and public data paths that aren’t necessarily tied to any data domain. For more examples of how ABAC can be used with permission policies, refer to How to scale your authorization needs by using attribute-based access control with S3.

Example 2: Lake Formation name-based access controls

In the data management solution named Novo Nordisk Enterprise Datahub (NNEDH), which we introduced in the first post, we use Lake Formation to enable standardized data access. The NNEDH datasets are registered in the Lake Formation Data Catalog as databases and tables, and permissions are granted using the named resource method. The following screenshot shows an example of these permissions.

Lakeformation named resource method for permissions management

In this approach, data access governance is delegated to Lake Formation. Every data domain in NNEDH has isolated permissions synthesized by NNEDH as the central governance management layer. This is a similar pattern to what is adopted for other domain-oriented data management solutions. Refer to Use an event-driven architecture to build a data mesh on AWS for an example of tag-based access control in Lake Formation.

These patterns don’t exclude implementations of peer-to-peer type data sharing mechanisms, such as those that can be achieved using AWS Resource Access Manager (AWS RAM), where a single IAM role session can have permissions that span across accounts.

Delegating role access to the consumption later

The following figure illustrates the data access workflow from an external service.

Data access workflow from external service

The workflow steps are as follows:

  1. A user authenticates on an IdP used by the analytics tool that they are trying to access. A wide range of analytics tools are supported by Novo Nordisk platform, such as Databricks and JupyterHub, and the IdP can be either SAML or OIDC type depending on the capabilities of the third-party tool. In this example, an Okta SAML application is used to sign into a third-party analytics tool, and an IAM SAML IdP is configured in the data domain AWS account to federate with the external IdP. The third post of this series describes how to set up an Okta SAML application for IAM role federation on Athena.
  2. The SAML assertion obtained during the sign-in process is used to request temporary security credentials of an IAM role through the AssumeRole operation. In this example, the SAML assertion is used onAssumeRoleWithSAMLoperation. For OpenID Connect-compatible IdPs, the operationAssumeRoleWithWebIdentitymust be used with the JWT. The SAML attributes in the assertion or the claims in the token can be generated at sign-in time, to ensure that the group memberships are forwarded, for the ABAC policy pattern described in the following sections.
  3. The analytics tool, such as Databricks or JupyterHub, abstracts the usage of the IAM role session credentials in the tool itself, and data can be accessed directly according to the permissions of the IAM role assumed. This pattern is similar in nature to IAM passthrough as implemented by Databricks, but in Novo Nordisk it’s extended across all analytics services. In this example, the analytics tool accesses the data lake on Amazon Simple Storage Service (Amazon S3) through Athena queries.

As the data mesh pattern expands across domains covering more downstream services, we need a mechanism to keep IdPs and IAM role trusts continuously updated. We come back to this part later in the post, but first we explain how role access is managed at scale.

Attribute-based trust policies

In previous sections, we emphasized that this architecture relies on IAM roles for data access control. Each data management platform can implement its own data access control method using IAM roles, such as identity-based policies or Lake Formation access control. For data consumption, it’s crucial that these IAM roles are only assumable by users that are part of Active Directory groups with the appropriate entitlements to use the role. To implement this at scale, the IAM role’s trust policy uses ABAC.

When a user authenticates on the external IdP of the consumption layer, we add in the access token a claim derived from their Active Directory groups. This claim is propagated by theAssumeRoleoperation into the trust policy of the IAM role, where it is compared with the expected Active Directory group. Only users that belong to the expected groups can assume the role. This mechanism is illustrated in the following figure.

Architecture of the integration with the identity provider

Translating group membership to attributes

To enforce the group membership entitlement at the role assumption level, we need a way to compare the required group membership with the group memberships that a user comes with in their IAM role session. To achieve this, we use a form of ABAC, where we have a way to represent the sum of context-relevant group memberships in a single attribute. A single IAM role session tag value is limited to 256 characters. The corresponding limit for SAML assertions is 100,000 characters, so for systems where a very large number of either roles or group-type mappings are required, SAML can support a wider range of configurations.

In our case, we have opted for a compression algorithm that takes a group name and compresses it to a 4-character string hash. This means that, together with a group-separation character, we can fit 51 groups in a single attribute. This gets pushed down to approximately 20 groups for OIDC type role assumption due to the PackedPolicySize, but is higher for a SAML-based flow. This has shown to be sufficient for our case. There is a risk that two different groups could hash to the same character combination; however, we have checked that there are no collisions in the existing groups. To mitigate this risk going forward, we have introduced guardrails in multiples places. First, before adding new groups entitlements in the virtualization layer, we check if there’s a hash collision with any existing group. When a duplicated group is attempted to be added, our service team is notified and we can react accordingly. But as stated earlier, there is a low probability of clashes, so the flexibility this provides outweighs the overhead associated with managing clashes (we have not had any yet). We additionally enforce this at SAML assertion creation time as well, to ensure that there are no duplicated groups in the users group list, and in cases of duplication, we remove both entirely. This means malicious actors can at most limit the access of other users, but not gain unauthorized access.

Enforcing audit functionality across sessions

As mentioned in the first post, on top of governance, there are strict requirements around auditability of data accesses. This means that for all data access requests, it must be possible to trace the specific user across services and retain this information. We achieve this by setting (and enforcing) a source identity for all role sessions and make sure to propagate enterprise identity to this attribute. We use a combination of Okta inline hooks and SAML session tags to achieve this. This means that the AWS CloudTrail logs for an IAM role session have the following information:

{
    "eventName": "AssumeRoleWithSAML",
    "requestParameters": {
        "SAMLAssertionlD": "id1111111111111111111111111",
        "roleSessionName": "[email protected]",
        "principalTags": {
            "nn-initials": "user",
            "department": "NNDepartment",
            "GroupHash": "xxxx",
            "email": "[email protected]",
            "cost-center": "9999"
        },
        "sourceIdentity": "[email protected]",
        "roleArn": "arn:aws:iam::111111111111:role/your-assumed-role",
        "principalArn": "arn:aws:iam,111111111111:saml-provider/your-saml-provider",
        ...
    },
    ...
}

On the IAM role level, we can enforce the required attribute configuration with the following example trust policy. This is an example for a SAML-based app. We support the same patterns through OpenID Connect IdPs.

We now go through the elements of an IAM role trust policy, based on the following example:

{
    "Version": "2008-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": {
            "Federated": [SAML_IdP_ARN]
        },
        "Action": [
            "sts:AssumeRoleWithSAML",
            "sts:TagSession",
            "sts:SetSourceIdentity"
        ],
        "Condition": {
            "StringEquals": {
                "SAML:aud": "https://signin.aws.amazon.com/saml"
            },
            "StringLike": {
                "sts:SourceIdentity": "*@novonordisk.com",
                "aws:RequestTag/GroupHash": ["*xxxx*"]
            },
            "StringNotLike": {
                "sts:SourceIdentity": "*"
            }
        }
    }
}

The policy contains the following details:

  • ThePrincipalstatement should point to the list of apps that are served through the consumption layer. These can be Azure app registrations, Okta apps, or Amazon Cognito app clients. This means that SAML assertions (in the case of SAML-based flows) minted from these applications can be used to run the operationAssumeRoleWithSamlif the remaining elements are also satisfied.
  • TheActionstatement includes the required permissions for theAssumeRolecall to succeed, including adding the contextual information to the role session.
  • In the first condition, the audience of the assertion needs to be targeting AWS.
  • In the second condition, there are twoStringLikerequirements:
    • A requirement on the source identity as the naming convention to follow at Novo Nordisk (users must come with enterprise identity, following our audit requirements).
    • Theaws:RequestTag/GroupHashneeds to bexxxx, which represents the hashed group name mentioned in the upper section.
  • Lastly, we enforce that sessions can’t be started without setting the source identity.

This policy enforces that all calls are from recognized services, include auditability, have the right target, and enforces that the user has the right group memberships.

Building a central overview of governance and trust

In this section, we discuss how Novo Nordisk keeps track of the relevant group-role relations and maps these at sign-in time.

Entitlements

In Novo Nordisk, all accesses are based on Active Directory group memberships. There is no user-based access. Because this pattern is so central, we have extended this access philosophy into our data accesses. As mentioned earlier, at sign-in time, the hooks need to be able to know which roles to assume for a given user, given this user’s group membership. We have modeled this data in Amazon DynamoDB, where just-in-time provisioning ensures that only the required user group memberships are available. By building our application around the use of groups, and by having the group propagation done by the application code, we avoid having to make a more general Active Directory integration, which would, for a company the size of Novo Nordisk, severely impact the application, simply due to the volume of users and groups.

The DynamoDB entitlement table contains all relevant information for all roles and services, including role ARNs and IdP ARNs. This means that when users log in to their analytics services, the sign-in hook can construct the required information for the Roles SAML attribute.

When new data domains are added to the data management layer, the data management layer needs to communicate both the role information and the group name that gives access to the role.

Single sign-on hub for analytics services

When scaling this permission model and data management pattern to a large enterprise such as Novo Nordisk, we ended up creating a large number of IAM roles distributed across different accounts. Then, a solution is required to map and provide access for end-users to the required IAM role. To simplify user access to multiple data sources and analytics tools, Novo Nordisk developed a single sign-on hub for analytics services. From the end-user perspective, this is a web interface that glues together different offerings in a unified system, making it a one-stop tool for data and analytics needs. When signing in to each of the analytical offerings, the authenticated sessions are forwarded, so users never have to reauthenticate.

Common for all the services supported in the consumption layer is that we can run a piece of application code at sign-in time, allowing sign-in time permissions to be calculated. The hooks that achieve this functionality can, for instance, be run by Okta inline hooks. This means that each of the target analytics services can have custom code to translate relevant contextual information or provide other types of automations for the role forwarding.

The sign-in flow is demonstrated in the following figure.

Sign-in flow

The workflow steps are as follows:

  1. A user accesses an analytical service such as Databricks in the Novo Nordisk analytics hub.
  2. The service uses Okta as the SAML-based IdP.
  3. Okta invokes an AWS Lambda-based SAML assertion inline hook.
  4. The hook uses the entitlement database, converting application-relevant group memberships into role entitlements.
  5. Relevant contextual information is returned from the entitlement database.
  6. The Lambda-based hook adds new SAML attributes to the SAML assertion, including the hashed group memberships and other contextual information such as source identity.
  7. A modified SAML assertion is used to sign users in to the analytical service.
  8. The user can now use the analytical tool with active IAM role sessions.

Synchronizing role trust

The preceding section gives an overview of how federation works in this solution. Now we can go through how we ensure that all participating AWS environments and accounts are in sync with the latest configuration.

From the end-user perspective, the synchronization mechanism must ensure that every analytics service instantiated can access the data domains assigned to the groups that the user belongs to. Also, changes in data domains—such as granting data access to an Active Directory group—must be effective immediately to every analytics service.

Two event-based mechanisms are used to maintain all the layers synchronized, as detailed in this section.

Synchronize data access control on the data management layer with changes to services in the consumption layer

As describe in the previous section, the IAM roles used for data access are created and managed by the data management layer. These IAM roles have a trust policy providing federated access to the external IdPs used by the analytics tools of the consumption layer. It implies that for every new analytical service created with a different IDP, the IAM roles used for data access on data domains must be updated to trust this new IdP.

Using NNEDH as an example of a data management solution, the synchronization mechanism is demonstrated in the following figure.

Synchronization mechanism in a data management solution

Taking as an example a scenario where a new analytics service is created, the steps in this workflow are as follows:

  1. A user with access to the administration console of the consumption layer instantiates a new analytics service, such as JupyterHub.
  2. A job running on AWS Fargate creates the resources needed for this new analytics service, such as an Amazon Elastic Compute Cloud (Amazon EC2) instance for JupyterHub, and the IdP required, such as a new SAML IdP.
  3. When the IdP is created in the previous step, an event is added in an Amazon Simple Notification Service (Amazon SNS) topic with its details, such as name and SAML metadata.
  4. In the NNEDH control plane, a Lambda job is triggered by new events on this SNS topic. This job creates the IAM IdP, if needed, and updates the trust policy of the required IAM roles in all the AWS accounts used as data domains, adding the trust on the IdP used by the new analytics service.

In this architecture, all the update steps are event-triggered and scalable. This means that users of new analytics services can access their datasets almost instantaneously when they are created. In the same way, when a service is removed, the federation to the IdP is automatically removed if not used by other services.

Propagate changes on data domains to analytics services

Changes to data domains, such as the creation of a new S3 bucket used as a dataset, or adding or removing data access to a group, must be reflected immediately on analytics services of the consumption layer. To accomplish it, a mechanism is used to synchronize the entitlement database with the relevant changes made in NNEDH. This flow is demonstrated in the following figure.

Changes propagation flow

Taking as an example a scenario where access to a specific dataset is granted to a new group, the steps in this workflow are as follows:

  1. Using the NNEDH admin console, a data owner approves a dataset sharing request that grants access on a dataset to an Active Directory group.
  2. In the AWS account of the related data domain, the dataset components such as the S3 bucket and Lake Formation are updated to provide data access to the new group. The cross-account data sharing in Lake Formation uses AWS RAM.
  3. An event is added in an SNS topic with the current details about this dataset, such as the location of the S3 bucket and the groups that currently have access to it.
  4. In the virtualization layer, the updated information from the data management layer is used to update the entitlement database in DynamoDB.

These steps make sure that changes on data domains are automatically and immediately reflected on the entitlement database, which is used to provide data access to all the analytics services of the consumption layer.

Limitations

Many of these patterns rely on the analytical tool to support a clever use of IAM roles. When this is not the case, the platform teams themselves need to develop custom functionality at the host level to ensure that role accesses are correctly controlled. This, for example, includes writing custom authenticators for JupyterHub.

Conclusion

This post shows an approach to building a scalable and secure data and analytics platform. It showcases some of the mechanisms used at Novo Nordisk and how to strike the right balance between freedom and control. The architecture laid out in the first post in this series enables layer independence, and exposes some extremely useful primitives for data access and governance. We make heavy use of contextual attributes to modulate role permissions at the session level, which provide just-in-time permissions. These permissions are propagated at a scale, across data domains. The upside is that a lot of the complexity related to managing data access permission can be delegated to the relevant business groups, while enabling the end-user consumers of data to think as little as possible about data accesses and focus on providing value for the business use cases. In the case of Novo Nordisk, they can provide better outcomes for patients and acceleration innovation.

The next post in this series describes how end-users can consume data from their analytics tool of choice, aligned with the data access controls detailed in this post.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Alessandro Fior is a Sr. Data Architect at AWS Professional Services. He is passionate about designing and building modern and scalable data platforms that accelerate companies to extract value from their data.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk, building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Anwar RizalAnwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Perform upserts in a data lake using Amazon Athena and Apache Iceberg

Post Syndicated from Ranjit Rajan original https://aws.amazon.com/blogs/big-data/perform-upserts-in-a-data-lake-using-amazon-athena-and-apache-iceberg/

Amazon Athena supports the MERGE command on Apache Iceberg tables, which allows you to perform inserts, updates, and deletes in your data lake at scale using familiar SQL statements that are compliant with ACID (Atomic, Consistent, Isolated, Durable). Apache Iceberg is an open table format for data lakes that manages large collections of files as tables. It supports modern analytical data lake operations such as create table as select (CTAS), upsert and merge, and time travel queries. Athena also supports the ability to create views and perform VACUUM (snapshot expiration) on Apache Iceberg tables to optimize storage and performance. With these features, you can now build data pipelines completely in standard SQL that are serverless, more simple to build, and able to operate at scale. This enables developers to:

  • Focus on writing business logic and not worry about setting up and managing the underlying infrastructure
  • Perform data transformations with Athena
  • Help comply with certain data deletion requirements
  • Apply change data capture (CDC) from sources databases

With data lakes, data pipelines are typically configured to write data into a raw zone, which is an Amazon Simple Storage Service (Amazon S3) bucket or folder that contains data as is from source systems. Data is accumulated in this zone, such that inserts, updates, or deletes on the sources database appear as records in new files as transactions occur on the source. Although the raw zone can be queried, any downstream processing or analytical queries typically need to deduplicate data to derive a current view of the source table. For example, if a single record is updated multiple times in the source database, these be need to be deduplicated and the most recent record selected.

Typically, data transformation processes are used to perform this operation, and a final consistent view is stored in an S3 bucket or folder. Data transformation processes can be complex requiring more coding, more testing and are also error prone. This was a challenge because data lakes are based on files and have been optimized for appending data. Previously, you had to overwrite the complete S3 object or folder, which was not only inefficient but also interrupted users who were querying the same data. With the evolution of frameworks such as Apache Iceberg, you can perform SQL-based upsert in-place in Amazon S3 using Athena, without blocking user queries and while still maintaining query performance.

In this post, we demonstrate how you can use Athena to apply CDC from a relational database to target tables in an S3 data lake.

Overview of solution

For this post, consider a mock sports ticketing application based on the following project. We use a single table in that database that contains sporting events information and ingest it into an S3 data lake on a continuous basis (initial load and ongoing changes). This data ingestion pipeline can be implemented using AWS Database Migration Service (AWS DMS) to extract both full and ongoing CDC extracts. With CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume. Most databases use a transaction log to record changes made to the database. AWS DMS reads the transaction log by using engine-specific API operations and captures the changes made to the database in a nonintrusive manner.

Specifically, to extract changed data including inserts, updates, and deletes from the database, you can configure AWS DMS with two replication tasks, as described in the following workshop. The first task performs an initial copy of the full data into an S3 folder. The second task is configured to replicate ongoing CDC into a separate folder in S3, which is further organized into date-based subfolders based on the source databases’ transaction commit date. With full and CDC data in separate S3 folders, it’s easier to maintain and operate data replication and downstream processing jobs. To enable this, you can apply the following extra connection attributes to the S3 endpoint in AWS DMS, (refer to S3Settings for other CSV and related settings):

  • TimestampColumnName – AWS DMS adds a column that you name with timestamp information for the commit of that row in the source database.
  • includeOpForFullLoad – AWS DMS adds a column named Op to every file to indicate if the record is an I (INSERT), U (UPDATE), or D (DELETE).
  • DatePartitionEnabled, DatePartitionSequence, DatePartitionDelimiter – These settings are used to configure AWS DMS to write changed data to date/time-based folders in the data lake. By partitioning folders, you can better manage S3 objects and optimize data lake queries for subsequent downstream processing.

We use the support in Athena for Apache Iceberg tables called MERGE INTO, which can express row-level updates. Apache Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated. After the data is merged, we demonstrate how to use Athena to perform time travel on the sporting_event table, and use views to abstract and present different versions of the data to end-users. Finally, to simplify table maintenance, we demonstrate performing VACUUM on Apache Iceberg tables to delete older snapshots, which will optimize latency and cost of both read and write operations.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1 and 2 use AWS DMS, which connects to the source database to load initial data and ongoing changes (CDC) to Amazon S3 in CSV format. For this post, we have provided sample full and CDC datasets in CSV format that have been generated using AWS DMS.
    • Step 3 is comprised of the following actions:
      • Create an external table in Athena pointing to the source data ingested in Amazon S3.
      • Create an Apache Iceberg target table and load data from the source table.
      • Merge CDC data into the Apache Iceberg table using MERGE INTO.
  • Data access:
    • In Step 4, create a view on the Apache Iceberg table.
    • Use the view to query data using standard SQL.

Prerequisites

Before getting started, make sure you have the required permissions to perform the following in your AWS account:

Create tables on the raw data

First, create a database for this demo.

  1. Navigate to the Athena console and choose Query editor.
    If this is your first time using the Athena query editor, you need to configure and specify an S3 bucket to store the query results.
  2. Create a database with the following code:
    CREATE DATABASE raw_demo;

  3. Next, create a folder in an S3 bucket that you can use for this demo. Name this folder sporting_event_full.
  4. Upload LOAD00000001.csv into the folder.
  5. Switch to the raw_demo database and create a table to point to the raw input data:
    CREATE EXTERNAL TABLE raw_demo.sporting_event(
      op string,
      cdc_timestamp timestamp, 
      id bigint, 
      sport_type_name string, 
      home_team_id int, 
      away_team_id int, 
      location_id smallint, 
      start_date_time timestamp, 
      start_date date, 
      sold_out smallint)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your bucket>/sporting_event_full/'
      ;

  6. Run the following query to review the data:
    SELECT * FROM raw_demo.sporting_event LIMIT 5;

  7. Next, create another folder in the same S3 bucket called sporting_event_cdc.
  8. Within this folder, create three subfolders in a time hierarchy folder structure such that the final S3 folder URI looks like s3://<your-bucket>/sporting_event_cdc/2022/09/22/.
  9. Upload 20220922-184314489.csv into this folder.This folder structure is similar to how AWS DMS stores CDC data when you enable date-based folder partitioning.
  10. Create a table to point to the CDC data. This table also includes a partition column because the source data in Amazon S3 is organized into date-based folders.
    CREATE EXTERNAL TABLE raw_demo.sporting_event_cdc(
    op string,
    cdc_timestamp timestamp,
    id bigint,
    sport_type_name string,
    home_team_id int,
    away_team_id int,
    location_id smallint,
    start_date_time timestamp,
    start_date date,
    sold_out smallint)
    PARTITIONED BY (partition_date string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your-bucket>/sporting_event_cdc/'
    ;

  11. Next, alter the table to add new partitions. Because the data is stored in non-Hive style format by AWS DMS, to query this data, add this partition manually or use an AWS Glue crawler. As data accumulates, continue to add new partitions to query this data.
    ALTER TABLE raw_demo.sporting_event_cdc ADD PARTITION (partition_date='2022-09-22') location 's3://<your-bucket>/sporting_event_cdc/2022/09/22/'

  12. Run the following query to review the CDC data:
    SELECT * FROM raw_demo.sporting_event_cdc;

There are two records with IDs 1 and 11 that are updates with op code U. The record with ID 21 has a delete (D) op code, and the record with ID 5 is an insert (I).

cdc data

Use CTAS to create the target Iceberg table in Parquet format

CTAS statements create new tables using standard SELECT queries. The resultant table is added to the AWS Glue Data Catalog and made available for querying.

  1. First, create another database to store the target table:
    CREATE DATABASE curated_demo;

  2. Next, switch to this database and run the CTAS statement to select data from the raw input table to create the target Iceberg table (replace the location with an appropriate S3 bucket in your account):
    CREATE TABLE curated_demo.sporting_event
    WITH (table_type='ICEBERG',
    location='s3://<your-bucket>/curated/sporting_event',
    format='PARQUET',
    is_external=false)
    AS SELECT
    id,
    sport_type_name,
    home_team_id,
    away_team_id,
    cast(location_id as int) as location_id,
    cast(start_date_time as timestamp(6)) as start_date_time,
    start_date,
    cast(sold_out as int) as sold_out
    FROM raw_demo.sporting_event
    ;

  3. Run the following query to review data in the Iceberg table:
    SELECT * FROM curated_demo.sporting_event LIMIT 5;

iceberg data

Use MERGE INTO to insert, update, and delete data into the Iceberg table

The MERGE INTO command updates the target table with data from the CDC table. The following statement uses a combination of primary keys and the Op column in the source data, which indicates if the source row is an insert, update, or delete. We use the id column as the primary key to join the target table to the source table, and we use the Op column to determine if a record needs to be deleted.

MERGE INTO curated_demo.sporting_event t
USING (SELECT op,
cdc_timestamp,
id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date,
sold_out
FROM raw_demo.sporting_event_cdc
WHERE partition_date ='2022-09-22') s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN
UPDATE SET
sport_type_name = s.sport_type_name,
home_team_id = s.home_team_id,
location_id = s.location_id,
start_date_time = s.start_date_time,
start_date = s.start_date,
sold_out = s.sold_out
WHEN NOT MATCHED THEN
INSERT (id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date)
VALUES
(s.id,
s.sport_type_name,
s.home_team_id,
s.away_team_id,
s.location_id,
s.start_date_time,
s.start_date)

Run the following query to verify data in the Iceberg table:

SELECT * FROM curated_demo.sporting_event WHERE id in (1, 5, 11, 21);

The record with ID 21 has been deleted, and the other records in the CDC dataset have been updated and inserted, as expected.

merge and delete

Create a view that contains the previous state

When you write to an Iceberg table, a new snapshot or version of a table is created each time.

A snapshot represents the state of a table at a point in time and is used to access the complete set of data files in the table. Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time or a specified snapshot ID. However, this requires knowledge of a table’s current snapshots. To abstract this information from users, you can create views on top of Iceberg tables:

CREATE VIEW curated_demo.v_sporting_event_previous_snapshot AS
SELECT id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
cast(start_date_time as timestamp(3)) as start_date_time,
start_date,
sold_out
FROM curated_demo.sporting_event
FOR TIMESTAMP AS OF current_timestamp + interval '-5' minute;

Run the following query using this view to retrieve the snapshot of data before the CDC was applied:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

You can see the record with ID 21, which was deleted earlier.

view data

Compliance with privacy regulations may require that you permanently delete records in all snapshots. To accomplish this, you can set properties for snapshot retention in Athena when creating the table, or you can alter the table:

ALTER TABLE curated_demo.sporting_event SET TBLPROPERTIES (
'vacuum_min_snapshots_to_keep'='1',
'vacuum_max_snapshot_age_seconds'='1'
)

This instructs Athena to store only one version of the data and not maintain any transaction history. After a table has been updated with these properties, run the VACUUM command to remove the older snapshots and clean up storage:

VACUUM curated_demo.sporting_event;

Run the following query again:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

The record with ID 21 has been permanently deleted.

final validation

Considerations

As data accumulates in the CDC folder of your raw zone, older files can be archived to Amazon S3 Glacier. Subsequently, the MERGE INTO statement can also be run on a single source file if needed by using $path in the WHERE condition of the USING clause:

MERGE INTO curated_demo.sporting_event t
USING (SELECT op, cdc_timestamp,id,sport_type_name, home_team_id, away_team_id, location_id, start_date_time, start_date, sold_out FROM raw_demo.sporting_event_cdc WHERE partition_date='2022-09-22' AND regexp_like("$path", ‘/sporting_event_cdc/2022/09/22/20220922-184314489.csv')
………..

This results in Athena scanning all files in the partition’s folder before the filter is applied, but can be minimized by choosing fine-grained hourly partitions. With this approach, you can trigger the MERGE INTO to run on Athena as files arrive in your S3 bucket using Amazon S3 event notifications. This could enable near-real-time use cases where users need to query a consistent view of data in the data lake as soon it is created in source systems.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following SQL to drop the tables and views:
    DROP TABLE raw_demo.sporting_event;
    DROP TABLE raw_demo.sporting_event_cdc;
    DROP TABLE curated_demo.sporting_event;
    DROP VIEW curated_demo.v_sporting_event_previous_snapshot;

    Because Iceberg tables are considered managed tables in Athena, dropping an Iceberg table also removes all the data in the corresponding S3 folder.

  2. Run the following SQL to drop the databases:
    DROP DATABASE raw_demo;
    DROP DATABASE curated_demo;

  3. Delete the S3 folders and CSV files that you had uploaded.

Conclusion

This post showed you how to apply CDC to a target Iceberg table using CTAS and MERGE INTO statements in Athena. You can perform bulk load using a CTAS statement. When new data or changed data arrives, use the MERGE INTO statement to merge the CDC changes. To optimize storage and improve performance of queries, use the VACUUM command regularly.

As next steps, you can orchestrate these SQL statements using AWS Step Functions to implement end-to-end data pipelines for your data lake. For more information, refer to Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions.


About the Authors

Ranjit Rajan is a Principal Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

Kannan Iyer is a Senior Data Lab Solutions Architect with AWS. Kannan works with AWS customers to help them design and build data and analytics applications in the cloud.

Alexandre Rezende is a Data Lab Solutions Architect with AWS. Alexandre works with customers on their Business Intelligence, Data Warehouse, and Data Lake use cases, design architectures to solve their business problems, and helps them build MVPs to accelerate their path to production.

Build a transactional data lake using Apache Iceberg, AWS Glue, and cross-account data shares using AWS Lake Formation and Amazon Athena

Post Syndicated from Vikram Sahadevan original https://aws.amazon.com/blogs/big-data/build-a-transactional-data-lake-using-apache-iceberg-aws-glue-and-cross-account-data-shares-using-aws-lake-formation-and-amazon-athena/

Building a data lake on Amazon Simple Storage Service (Amazon S3) provides numerous benefits for an organization. It allows you to access diverse data sources, build business intelligence dashboards, build AI and machine learning (ML) models to provide customized customer experiences, and accelerate the curation of new datasets for consumption by adopting a modern data architecture or data mesh architecture.

However, many use cases, like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake, require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite entire datasets as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance and compaction maintenance.

In 2022, we announced that you can enforce fine-grained access control policies using AWS Lake Formation and query data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi, and more using Amazon Athena queries. You get the flexibility to choose the table and file format best suited for your use case and get the benefit of centralized data governance to secure data access when using Athena.

In this post, we show you how to configure Lake Formation using Iceberg table formats. We also explain how to upsert and merge in an S3 data lake using an Iceberg framework and apply Lake Formation access control using Athena.

Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon S3. Iceberg also helps guarantee data correctness under concurrent write scenarios.

Solution overview

To explain this setup, we present the following architecture, which integrates Amazon S3 for the data lake (Iceberg table format), Lake Formation for access control, AWS Glue for ETL (extract, transform, and load), and Athena for querying the latest inventory data from the Iceberg tables using standard SQL.

The solution workflow consists of the following steps, including data ingestion (Steps 1–3), data governance (Step 4), and data access (Step 5):

  1. We use AWS Database Migration Service (AWS DMS) or a similar tool to connect to the data source and move incremental data (CDC) to Amazon S3 in CSV format.
  2. An AWS Glue PySpark job reads the incremental data from the S3 input bucket and performs deduplication of the records.
  3. The job then invokes Iceberg’s MERGE statements to merge the data with the target S3 bucket.
  4. We use the AWS Glue Data Catalog as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema. Lake Formation allows you to centrally manage permissions and access control for Data Catalog resources in your S3 data lake. You can use fine-grained access control in Lake Formation to restrict access to data in query results.
  5. We use Athena integrated with Lake Formation to query data from the Iceberg table using standard SQL and validate table- and column-level access on Iceberg tables.

For this solution, we assume that the raw data files are already available in Amazon S3, and focus on processing the data using AWS Glue with Iceberg table format. We use sample item data that has the following attributes:

  • op – This represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. Make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source data table.
  • category – This column represents the category of an item.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the item record was updated at the source data.

We demonstrate implementing the solution with the following steps:

  1. Create an S3 bucket for input and output data.
  2. Create input and output tables using Athena.
  3. Insert the data into the Iceberg table from Athena.
  4. Query the Iceberg table using Athena.
  5. Upload incremental (CDC) data for further processing.
  6. Run the AWS Glue job again to process the incremental files.
  7. Query the Iceberg table again using Athena.
  8. Define Lake Formation policies.

Prerequisites

For Athena queries, we need to configure an Athena workgroup with engine version 3 to support Iceberg table format.

To validate cross-account access through Lake Formation for Iceberg table, in this post we used two accounts (primary and secondary).

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output data

Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process them with AWS Glue PySpark code for the output.

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name asiceberg-blog and leave the remaining fields as default.

S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as<Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}might help you get a unique name.

  1. On the bucket details page, choose Create folder.
  2. Create two subfolders. For this post, we createiceberg-blog/raw-csv-input andiceberg-blog/iceberg-output.
  3. Upload theLOAD00000001.csvfile into the raw-csv-input folder.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena query editor and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_lf_db;

As we explain later in this post, it’s essential to record the data locations when incorporating Lake Formation access controls.

-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_lf_db.csv_input(
op string,
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
'areColumnsQuoted'='false',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'typeOfData'='file');

-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_lf_db.iceberg_table_lf (
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time timestamp)
PARTITIONED BY (category, bucket(16,product_id))
LOCATION 's3://glue-iceberg-demo/iceberg_blog/iceberg-output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
);

-- Validate the input data
SELECT * FROM iceberg_lf_db.csv_input;

SELECT * FROM iceberg_lf_db.iceberg_table_lf;

Alternatively, you can use an AWS Glue crawler to create the table definition for the input files.

Insert the data into the Iceberg table from Athena

Optionally, we can insert data into the Iceberg table through Athena using the following code:

insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (200,'Mobile','Mobile brand 1',25,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (201,'Laptop','Laptop brand 1',20,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (202,'Tablet','Kindle',30,cast('2023-01-19 09:51:41' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (203,'Speaker','Alexa',10,cast('2023-01-19 09:51:42' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (204,'Speaker','Alexa',50,cast('2023-01-19 09:51:43' as timestamp));

For this post, we load the data using an AWS Glue job. Complete the following steps to create the job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Visual with a blank canvas.
  4. Choose Create.
  5. Choose Edit script.
  6. Replace the script with the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max

from pyspark.conf import SparkConf

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()

## spark.sql.catalog.job_catalog.warehouse can be passed as an ## runtime argument with value as the S3 path
## Please make sure to pass runtime argument –
## iceberg_job_catalog_warehouse with value as the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


## Read Input Table
## glueContext.create_data_frame.from_catalog can be more 
## performant and can be replaced in place of 
## create_dynamic_frame.from_catalog.

IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_lf_db", table_name = "csv_input", transformation_ctx = "IncrementalInputDyF")
IncrementalInputDF = IncrementalInputDyF.toDF()

if not IncrementalInputDF.rdd.isEmpty():
## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation
IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)

# Add new columns to capture OP value and what is the latest timestamp
inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))

# Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output
NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

# Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
finalInputDF.createOrReplaceTempView("incremental_input_data")
finalInputDF.show()

## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_lf_db.iceberg_table_lf t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")

job.commit()
  1. On the Job details tab, specify the job name (iceberg-lf).
  2. For IAM Role, assign an AWS Identity and Access Management (IAM) role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  3. For Glue version, choose Glue 4.0 (Glue 3.0 is also supported).
  4. For Language, choose Python 3.
  5. Make sure Job bookmark has the default value of Enable.
  6. For Job parameters, add the following:
    1. Add the key--datalake-formatswith the valueiceberg.
    2. Add the key--iceberg_job_catalog_warehouse with the value as your S3 path (s3://<bucket-name>/<iceberg-warehouse-path>).
  7. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_lf_db.iceberg_table_lf limit 10;

The output of the query should match the input, with one difference: the Iceberg output table doesn’t have theopcolumn.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload an incremental file.

This file includes updated records on two items.

Run the AWS Glue job again to process incremental files

Because the AWS Glue job has bookmarks enabled, the job picks up the new incremental file and performs a MERGE operation on the Iceberg table.

To run the job again, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select the job and choose Run.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena after incremental data processing

When the incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for items 200 and 201.

The following screenshot shows the output.

Define Lake Formation policies

For data governance, we use Lake Formation. Lake Formation is a fully managed service that simplifies data lake setup, supports centralized security management, and provides transactional access on top of your data lake. Moreover, it enables data sharing across accounts and organizations. There are two ways to share data resources in Lake Formation: named resource access control (NRAC) and tag-based access control (TBAC). NRAC uses AWS Resource Access Manager (AWS RAM) to share data resources across accounts using Lake Formation V3. Those are consumed via resource links that are based on created resource shares. Lake Formation tag-based access control (LF-TBAC) is another approach to share data resources in Lake Formation, which defines permissions based on attributes. These attributes are called LF-tags.

In this example, we create databases in the primary account. Our NRAC database is shared with a data domain via AWS RAM. Access to data tables that we register in this database will be handled through NRAC.

Configure access controls in the primary account

In the primary account, complete the following steps to set up access controls using Lake Formation:

  1. On the Lake Formation console, choose Data lake locations in the navigation pane.
  2. Choose Register location.
  3. Update the Iceberg Amazon S3 location path shown in the following screenshot.

Grant access to the database to the secondary account

To grant database access to the external (secondary) account, complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Choose External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the database name.

The first grant should be at database level, and the second grant is at table level.

  1. For Database permissions, specify your permissions (for this post, we select Describe).
  2. Choose Grant.

Now you need to grant permissions at the table level.

  1. Select External accounts and enter the secondary account number.
  2. Select Named data catalog resources.
  3. Verify the table name.
  4. For Table permissions, specify the permissions you want to grant. For this post, we select Select and Describe.
  5. Choose Grant.

If you see the following error, you must revokeIAMAllowedPrincipalsfrom the data lake permissions.

To do so, select IAMAllowedPrincipals and choose Revoke.

Choose Revoke again to confirm.

After you revoke the data permissions, the permissions should appear as shown in the following screenshot.

Add AWS Glue IAM role permissions

Because the IAM principal role was revoked, the AWS Glue IAM role that was used in the AWS Glue job needs to be added exclusively to grant access as shown in the following screenshot.

You need to repeat these steps for the AWS Glue IAM role at table level.

Verify the permissions granted to the AWS Glue IAM role on the Lake Formation console.

Grant access to the Iceberg table to the external account

In the secondary account, complete the following steps to grant access to the Iceberg table to external account.

  1. On the AWS RAM console, choose Resource shares in the navigation pane.
  2. Choose the resource shares invitation sent from the primary account.
  3. Choose Accept resource share.

The resource status should now be active.

Next, you need to create a resource link for the shared Iceberg table and access through Athena.

  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Select the Iceberg table (shared from the primary account).
  3. On the Actions menu, choose Create resource link.
  4. For Resource link name, enter a name (for this post,iceberg_table_lf_demo).
  5. For Database, choose your database and verify the shared table and database are automatically populated.
  6. Choose Create.
  7. Select your table and on the Actions menu, choose View data.

You’re redirected to the Athena console, where you can query the data.

Grant column-based access in the primary account

For column-level restricted access, you need to grant access at the column level on the Iceberg table. Complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Select External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the table name.
  6. For Table permissions, choose the permissions you want to grant. For this post, we select Select.
  7. Under Data permissions, choose Column-based access.
  8. Select Include columns and choose your permission filters (for this post, Category and Quantity_available).
  9. Choose Grant.

Data with restricted columns can now be queried through the Athena console.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

This post explains how you can use the Iceberg framework with AWS Glue and Lake Formation to define cross-account access controls and query data using Athena. It provides an overview of Iceberg and its features and integration approaches, and explains how you can ingest data, grant cross-account access, and query data through a step-by-step guide.

We hope this gives you a great starting point for using Iceberg to build your data lake platform along with AWS analytics services to implement your solution.


About the Authors

Vikram Sahadevan is a Senior Resident Architect on the AWS Data Lab team. He enjoys efforts that focus around providing prescriptive architectural guidance, sharing best practices, and removing technical roadblocks with joint engineering engagements between customers and AWS technical resources that accelerate data, analytics, artificial intelligence, and machine learning initiatives.

Suvendu Kumar Patra possesses 18 years of experience in infrastructure, database design, and data engineering, and he currently holds the position of Senior Resident Architect at Amazon Web Services. He is a member of the specialized focus group, AWS Data Lab, and his primary duties entail working with executive leadership teams of strategic AWS customers to develop their roadmaps for data, analytics, and AI/ML. Suvendu collaborates closely with customers to implement data engineering, data hub, data lake, data governance, and EDW solutions, as well as enterprise data strategy and data management.

Visualize Confluent data in Amazon QuickSight using Amazon Athena

Post Syndicated from Ahmed Zamzam original https://aws.amazon.com/blogs/big-data/visualize-confluent-data-in-amazon-quicksight-using-amazon-athena/

This is a guest post written by Ahmed Saef Zamzam and Geetha Anne from Confluent.

Businesses are using real-time data streams to gain insights into their company’s performance and make informed, data-driven decisions faster. As real-time data has become essential for businesses, a growing number of companies are adapting their data strategy to focus on data in motion. Event streaming is the central nervous system of a data in motion strategy and, in many organizations, Apache Kafka is the tool that powers it.

Today, Kafka is well known and widely used for streaming data. However, managing and operating Kafka at scale can still be challenging. Confluent offers a solution through its fully managed, cloud-native service that simplifies running and operating data streams at scale. Confluent extends open-source Kafka through a suite of related services and features designed to enhance the data in motion experience for operators, developers, and architects in production.

In this post, we demonstrate how Amazon Athena, Amazon QuickSight, and Confluent work together to enable visualization of data streams in near-real time. We use the Kafka connector in Athena to do the following:

  • Join data inside Confluent with data stored in one of the many data sources supported by Athena, such as Amazon Simple Storage Service (Amazon S3)
  • Visualize Confluent data using QuickSight

Challenges

Purpose-built stream processing engines, like Confluent ksqlDB, often provide SQL-like semantics for real-time transformations, joins, aggregations, and filters on streaming data. With ksqlDB, you can create persistent queries, which continuously process streams of events according to specific logic, and materialize streaming data in views that can be queried at a point in time (pull queries) or subscribed to by clients (push queries).

ksqlDB is one solution that made stream processing accessible to a wider range of users. However, pull queries, like those supported by ksqlDB, may not be suitable for all stream processing use cases, and there may be complexities or unique requirements that pull queries are not designed for.

Data visualization for Confluent data

A frequent use case for enterprises is data visualization. To visualize data stored in Confluent, you can use one of over 120 pre-built connectors, provided by Confluent, to write streaming data to a destination data store of your choice. Next, you connect your business intelligence (BI) tool to the data store to begin visualizing the data.

The following diagram depicts a typical architecture utilized by many Confluent customers. In this workflow, data is written to Amazon S3 through the Confluent S3 sink connector and then analyzed with Athena, a serverless interactive analytics service that enables you to analyze and query data stored in Amazon S3 and various other data sources using standard SQL. You can then use Athena as an input data source to QuickSight, a highly scalable cloud native BI service, for further analysis.

typical architecture utilized by many Confluent customers.

Although this approach works well for many use cases, it requires data to be moved, and therefore duplicated, before it can be visualized. This duplication not only adds time and effort for data engineers who may need to develop and test new scripts, but also creates data redundancy, making it more challenging to manage and secure the data, and increases storage cost.

Enriching data with reference data in another data store

With ksqlDB queries, the source and destination are always Kafka topics. Therefore, if you have a data stream that you need to enrich with external reference data, you have two options. One option is to import the reference data into Confluent, model it as a table, and use ksqlDB’s stream-table join to enrich the stream. The other option is to ingest the data stream into a separate data store and perform join operations there. Both require data movement and result in duplicate data storage.

Solution overview

So far, we have discussed two challenges that are not addressed by conventional stream processing tools. Is there a solution that addresses both challenges simultaneously?

When you want to analyze data without separate pipelines and jobs, a popular choice is Athena. With Athena, you can run SQL queries on a wide range of data sources—in addition to Amazon S3—without learning a new language, developing scripts to extract (and duplicate) data, or managing infrastructure.

Recently, Athena announced a connector for Kafka. Like Athena’s other connectors, queries on Kafka are processed within Kafka and return results to Athena. The connector supports predicate pushdown, which means that adding filters to your queries can reduce the amount of data scanned, improve query performance, and reduce cost.

For example, when using this connector, the amount of data scanned by the query SELECT * FROM CONFLUENT_TABLE could be significantly higher than the amount of data scanned by the query SELECT * FROM CONFLUENT_TABLE WHERE COUNTRY = 'UK'. The reason is that the AWS Lambda function which provides the runtime environment for the Athena connector, filters data at the source before returning it to Athena.

Let’s assume we have a stream of online transactions flowing into Confluent and customer reference data stored in Amazon S3. We want to use Athena to join both data sources together and produce a new dataset for QuickSight. Instead of using the S3 sink connector to load data into Amazon S3, we use Athena to query Confluent and join it with S3 data—all without moving data. The following diagram illustrates this architecture.

Athena to join both data sources together and produce a new dataset for QuickSight

We perform the following steps:

  1. Register the schema of your Confluent data.
  2. Configure the Athena connector for Kafka.
  3. Optionally, interactively analyze Confluent data.
  4. Create a QuickSight dataset using Athena as the source.

Register the schema

To connect Athena to Confluent, the connector needs the schema of the topic to be registered in the AWS Glue Schema Registry, which Athena uses for query planning.

The following is a sample record in Confluent:

{
  "transaction_id": "23e5ed25-5818-4d4f-acb3-73ef04d51d21",
  "customer_id": "126-58-9758",
  "amount": 986,
  "timestamp": "2023-01-03T15:40:42",
  "product_category": "health_fitness"
}

The following is the schema of this record:

{
  "topicName": "transactions",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "transaction_id",
        "mapping": "transaction_id",
        "type": "VARCHAR"
      },
      {
        "name": "customer_id",
        "mapping": "customer_id",
        "type": "VARCHAR"
      },
      {
        "name": "amount",
        "mapping": "amount",
        "type": "INTEGER"
      },
      {
        "name": "timestamp",
        "mapping": "timestamp",
        "type": "timestamp",
        "formatHint": "yyyy-MM-dd\'T\'HH:mm:ss"
      },
      {
        "name": "product_category",
        "mapping": "product_category",
        "type": "VARCHAR"
      },
      {
        "name": "customer_id",
        "mapping": "customer_id",
        "type": "VARCHAR"
      }
    ]
  }
}

The data producer writing the data can register this schema with the AWS Glue Schema Registry. Alternatively, you can use the AWS Management Console or AWS Command Line Interface (AWS CLI) to create a schema manually.

We create the schema manually by running the following CLI command. Replace <registry_name> with your registry name and make sure that the text in the description field includes the required string {AthenaFederationKafka}:

aws glue create-registry –registry-name <registry_name> --description {AthenaFederationKafka}

Next, we run the following command to create a schema inside the newly created schema registry:

aws glue create-schema –registry-id RegistryName=<registry_name> --schema-name <schema_name> --compatibility <Compatibility_Mode> --data-format JSON –schema-definition <Schema>

Before running the command, be sure to provide the following details:

  • Replace <registry_name> with our AWS Glue Schema Registry name
  • Replace <schema_name> with the name of our Confluent Cloud topic, for example, transactions
  • Replace <Compatibility_Mode> with one of the supported compatibility modes, for example, ‘Backward’
  • Replace <Schema> with our schema

Configure and deploy the Athena Connector

With our schema created, we’re ready to deploy the Athena connector. Complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. Search for and select Apache Kafka.
    Add Apache Kafka as data source
  4. For Data source name, enter the name for the data source.
    Enter name for data source

This data source name will be referenced in your queries. For example:

SELECT * 
FROM <data_source_name>.<registry_name>.<schema_name>
WHERE COL1='SOMETHING'

Applying this to our use case and previously defined schema, our query would be as follows:

SELECT * 
FROM "Confluent"."transactions_db"."transactions"
WHERE product_category='Kids'
  1. In the Connection details section, choose Create Lambda function.
    create lambda function

You’re redirected to the Applications page on the Lambda console. Some of the application settings are already filled.

The following are the important settings required for integrating with Confluent Cloud. For more information on these settings, refer to Parameters.

  1. For LambdaFunctionName, enter the name for the Lambda function the connector will use. For example, athena_confluent_connector.

We use this parameter in the next step.

  1. For KafkaEndpoint, enter the Confluent Cloud bootstrap URL.

You can find this on the Cluster settings page in the Confluent Cloud UI.

enter the Confluent Cloud bootstrap URL

Confluent Cloud supports two authentication mechanisms: OAuth and SASL/PLAIN (API keys). The connector doesn’t support OAuth; this leaves us with SASL/PLAIN. SASL/PLAIN uses SSL as a security protocol and PLAIN as SASL mechanism.

  1. For AuthType, enter SASL_SSL_PLAIN.

The API key and secret used by the connector to access Confluent need to be stored in AWS Secrets Manager.

  1. Get your Confluent API key or create a new one.
  2. Run the following AWS CLI command to create the secret in Secrets Manager:
    aws secretsmanager create-secret \
        --name <SecretNamePrefix>\
        --secret-string "{\"username\":\"<Confluent_API_KEY>\",\"password\":\"<Confluent_Secret>\"}"

The secret string should have two key-value pairs, one named username and the other password.

  1. For SecretNamePrefix, enter the secret name prefix created in the previous step.
  2. If the Confluent cloud cluster is reachable over the internet, leave SecurityGroupIds and SubnetIds blank. Otherwise, your Lambda function needs to run in a VPC that has connectivity to your Confluent Cloud network. Therefore, enter a security group ID and three private subnet IDs in this VPC.
  3. For SpillBucket, enter the name of an S3 bucket where the connector can spill data.

Athena connectors temporarily store (spill) data to Amazon S3 for further processing by Athena.

  1. Select I acknowledge that this app creates custom IAM roles and resource policies.
  2. Choose Deploy.
  3. Return to the Connection details section on the Athena console and for Lambda, enter the name of the Lambda function you created.
  4. Choose Next.
    Return to the Connection details section on the Athena console and for Lambda, enter the name of the Lambda function you created. And Choose Next.
  5. Choose Create data source.

Perform interactive analysis on Confluent data

With the Athena connector set up, our streaming data is now queryable from the same service we use to analyze S3 data lakes. Next, we use Athena to conduct point-in-time analysis of transactions flowing through Confluent Cloud.

Aggregation

We can use standard SQL functions to aggregate the data. For example, we can get the revenue by product category:

SELECT product_category, SUM(amount) AS Revenue
FROM "Confluent"."athena_blog"."transactions"
GROUP BY product_category
ORDER BY Revenue desc

SQL function to aggregate data

Enrich transaction data with customer data

The aggregation example is also available with ksqlDB pull queries. However, Athena’s connector allows us to join the data with other data sources like Amazon S3.

In our use case, the transactions streamed to Confluent Cloud lack detailed information about customers, apart from a customer_id. However, we have a reference dataset in Amazon S3 that has more information about the customers. With Athena, we can join both datasets together to gain insights about our customers. See the following code:

SELECT * 
FROM "Confluent"."athena_blog"."transactions" a
INNER JOIN "AwsDataCatalog"."athenablog"."customer" b 
ON a.customer_id=b.customer_id

join data

You can see from the results that we were able to enrich the streaming data with customer details, stored in Amazon S3, including name and address.

Visualize data using QuickSight

Another powerful feature this connector brings is the ability to visualize data stored in Confluent using any BI tool that supports Athena as a data source. In this post, we use QuickSight. QuickSight is a machine learning (ML)-powered BI service built for the cloud. You can use it to deliver easy-to-understand insights to the people you work with, wherever they are.

For more information about signing up for QuickSight, see Signing up for an Amazon QuickSight subscription.

Complete the following steps to visualize your streaming data with QuickSight:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena as the data source.
  4. For Data source name, enter a name.
  5. Choose Create data source.
  6. In the Choose your table section, choose Use custom SQL.
    In the Choose your table section, choose Use custom SQL.
  7. Enter the join query like the one given previously, then choose Confirm query.
    Enter the join query like the one given previously, then choose Confirm query.
  8. Next, choose to import the data into SPICE (Super-fast, Parallel, In-memory Calculation Engine), a fully managed in-memory cache that boosts performance, or directly query the data.

Utilizing SPICE will enhance performance, but the data may need to be periodically updated. You can choose to incrementally refresh your dataset or schedule regular refreshes with SPICE. If you want near-real-time data reflected in your dashboards, select Directly query your data. Note that with the direct query option, user actions in QuickSight, such as applying a drill-down filter, may invoke a new Athena query.

  1. Choose Visualize.
    Choose Visualize

That’s it, we have successfully connected QuickSight to Confluent through Athena. With just a few clicks, you can create a few visuals displaying data from Confluent.

successfully connected QuickSight to Confluent through Athena.

Clean up

To avoid incurring ongoing charges, delete the resources you provisioned by completing the following steps:

  1. Delete the AWS Glue schema and registry.
  2. Delete the Athena Kafka connector.
  3. Delete the QuickSight dataset.

Conclusion

In this post, we discussed use cases for Athena and Confluent. We provided examples of how you can use both for near-real-time data visualization with QuickSight and interactive analysis involving joins between streaming data in Confluent and data stored in Amazon S3.

The Athena connector for Kafka simplifies the process of querying and analyzing streaming data from Confluent Cloud. It removes the need to first move streaming data to persistent storage before it can be used in downstream use cases like business intelligence. This complements the existing integration between Confluent and Athena, using the S3 sink connector, which enables loading streaming data into a data lake, and is an additional option for customers who want to enable interactive analysis on Confluent data.


About the authors

Ahmed Zamzam is a Senior Partner Solutions Architect at Confluent, with a focus on the AWS partnership. In his role, he works with customers in the EMEA region across various industries to assist them in building applications that leverage their data using Confluent and AWS. Prior to Confluent, Ahmed was a Specialist Solutions Architect for Analytics AWS specialized in data streaming and search. In his free time, Ahmed enjoys traveling, playing tennis, and cycling.

Geetha Anne is a Partner Solutions Engineer at Confluent with previous experience in implementing solutions for data-driven business problems on the cloud, involving data warehousing and real-time streaming analytics. She fell in love with distributed computing during her undergraduate days and has followed her interest ever since. Geetha provides technical guidance, design advice, and thought leadership to key Confluent customers and partners. She also enjoys teaching complex technical concepts to both tech-savvy and general audiences.

Interact with Apache Iceberg tables using Amazon Athena and cross account fine-grained permissions using AWS Lake Formation

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/interact-with-apache-iceberg-tables-using-amazon-athena-and-cross-account-fine-grained-permissions-using-aws-lake-formation/

We recently announced support for AWS Lake Formation fine-grained access control policies in Amazon Athena queries for data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi and Apache Hive. AWS Lake Formation allows you to define and enforce database, table, and column-level access policies to query Iceberg tables stored in Amazon S3. Lake Formation provides an authorization and governance layer on data stored in Amazon S3. This capability requires that you upgrade to Athena engine version 3.

Large organizations often have lines of businesses (LoBs) that operate with autonomy in managing their business data. It makes sharing data across LoBs non-trivial. These organizations have adopted a federated model, with each LoB having the autonomy to make decisions on their data. They use the publisher/consumer model with a centralized governance layer that is used to enforce access controls. If you are interested in learning more about data mesh architecture, visit Design a data mesh architecture using AWS Lake Formation and AWS Glue. With Athena engine version 3, customers can use the same fine-grained controls for open data frameworks such as Apache Iceberg, Apache Hudi, and Apache Hive.

In this post, we deep dive into a use-case where you have a producer/consumer model with data sharing enabled to give restricted access to an Apache Iceberg table that the consumer can query. We’ll discuss column filtering to restrict certain rows, filtering to restrict column level access, schema evolution, and time travel.

Solution overview

To illustrate the functionality of fine-grained permissions for Apache Iceberg tables with Athena and Lake Formation, we set up the following components:

  • In the producer account:
    • An AWS Glue Data Catalog to register the schema of a table in Apache Iceberg format
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from the producer account
  • In the consumer account:
    • AWS Resource Access Manager (AWS RAM) to create a handshake between the producer Data Catalog and consumer
    • Lake Formation to provide fine-grained access to the consumer account
    • Athena to verify data from producer account

The following diagram illustrates the architecture.

Cross-account fine-grained permissions architecture

Prerequisites

Before you get started, make sure you have the following:

Data producer setup

In this section, we present the steps to set up the data producer.

Create an S3 bucket to store the table data

We create a new S3 bucket to save the data for the table:

  1. On the Amazon S3 console, create an S3 bucket with unique name (for this post, we use iceberg-athena-lakeformation-blog).
  2. Create the producer folder inside the bucket to use for the table.

Amazon S3 bucket and folder creation

Register the S3 path storing the table using Lake Formation

We register the S3 full path in Lake Formation:

  1. Navigate to the Lake Formation console.
  2. If you’re logging in for the first time, you’re prompted to create an admin user.
  3. In the navigation pane, under Register and ingest, choose Data lake locations.
  4. Choose Register location, and provide the S3 bucket path that you created earlier.
  5. Choose AWSServiceRoleForLakeFormationDataAccess for IAM role.

For additional information about roles, refer to Requirements for roles used to register locations.

If you enabled encryption of your S3 bucket, you have to provide permissions for Lake Formation to perform encryption and decryption operations. Refer to Registering an encrypted Amazon S3 location for guidance.

  1. Choose Register location.

Register Lake Formation location

Create an Iceberg table using Athena

Now let’s create the table using Athena backed by Apache Iceberg format:

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (iceberg-athena-lakeformation-blog/producer).
  3. Choose Save.
  4. In the query editor, enter the following query (replace the location with the S3 bucket that you registered with Lake Formation). Note that we use the default database, but you can use any other database.
CREATE TABLE consumer_iceberg (
customerid bigint,
customername string,
email string,
city string,
country string,
territory string,
contactfirstname string,
contactlastname string)
LOCATION 's3://YOUR-BUCKET/producer/' -- *** Change bucket name to your bucket***
TBLPROPERTIES ('table_type'='ICEBERG')
  1. Choose Run.

Athena query editor to create Iceberg table

Share the table with the consumer account

To illustrate functionality, we implement the following scenarios:

  • Provide access to selected columns
  • Provide access to selected rows based on a filter

Complete the following steps:

  1. On the Lake Formation console, in the navigation pane under Data catalog, choose Data filters.
  2. Choose Create new filter.
  3. For Data filter name, enter blog_data_filter.
  4. For Target database, enter lf-demo-db.
  5. For Target table, enter consumer_iceberg.
  6. For Column-level access, select Include columns.
  7. Choose the columns to share with the consumer: country, address, contactfirstname, city, customerid, and customername.
  8. For Row filter expression, enter the filter country='France'.
  9. Choose Create filter.

create data filter

Now let’s grant access to the consumer account on the consumer_iceberg table.

  1. In the navigation pane, choose Tables.
  2. Select the consumer_iceberg table, and choose Grant on the Actions menu.
    Grant access to consumer account on consumer_iceberg table
  3. Select External accounts.
  4. Enter the external account ID.
    Grant data permissions
  5. Select Named data catalog resources.
  6. Choose your database and table.
  7. For Data filters, choose the data filter you created.
    Add data filter
  8. For Data filter permissions and Grantable permissions, select Select.
  9. Choose Grant.

Permissions for creating grant

Data consumer setup

To set up the data consumer, we accept the resource share and create a table using AWS RAM and Lake Formation. Complete the following steps:

  1. Log in to the consumer account and navigate to the AWS RAM console.
  2. Under Shared with me in the navigation pane, choose Resource shares.
  3. Choose your resource share.
    Resource share in consumer account
  4. Choose Accept resource share.
  5. Note the name of the resource share to use in the next steps.
    Accept resource share
  6. Navigate to the Lake Formation console.
  7. If you’re logging in for the first time, you’re prompted to create an admin user.
  8. Choose Databases in the navigation pane, then choose your database.
  9. On the Actions menu, choose Create resource link.
    Create a resource link
  10. For Resource link name, enter the name of your resource link (for example, consumer_iceberg).
  11. Choose your database and shared table.
  12. Choose Create.
    Create table with resource link

Validate the solution

Now we can run different operations on the tables to validate the fine-grained access controls.

Insert operation

Let’s insert data into the consumer_iceberg table in the producer account, and validate the data filtering works as expected in the consumer account.

  1. Log in to the producer account.
  2. On the Athena console, choose Query editor in the navigation pane.
  3. Use the following SQL to write and insert data into the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
INSERT INTO consumer_iceberg VALUES (1, 'Land of Toys Inc.', '[email protected]',
'NYC','USA', 'NA', 'James', 'xxxx 118th NE');

INSERT INTO consumer_iceberg VALUES (2, 'Reims Collectables', '[email protected]',
'Reims','France', 'EMEA', 'Josephine', 'Darakjy');

INSERT INTO consumer_iceberg VALUES (3, 'Lyon Souveniers', '[email protected]',
'Paris', 'France', 'EMEA','Art', 'Venere');

Insert data into consumer_iceberg table in the producer account

  1. Use the following SQL to read and select data in the Iceberg table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run select query to validate rows were inserted

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Run same query in consumer account

Based on the filters, the consumer has visibility to a subset of columns, and rows where the country is France.

Update/Delete operations

Now let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Update city='Paris' WHERE city='Reims' and delete the row customerid = 3;
    UPDATE consumer_iceberg SET city= 'Paris' WHERE city= 'Reims' ;

    Run update query in producer account

DELETE FROM consumer_iceberg WHERE customerid =3;

Run delete query in producer account

  1. Verify the updated and deleted dataset:
SELECT * FROM consumer_iceberg;

Verify update and delete reflected in producer account

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Verify update and delete in consumer account

We can observe that only one row is available and the city is updated to Paris.

Schema evolution: Add a new column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Add a new column called geo_loc in the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg ADD COLUMNS (geo_loc string);

INSERT INTO consumer_iceberg VALUES (5, 'Test_user', '[email protected]',
'Reims','France', 'EMEA', 'Test_user', 'Test_user', 'test_geo');

SELECT * FROM consumer_iceberg;

Add a new column in producer aacccount

To provide visibility to the newly added geo_loc column, we need to update the Lake Formation data filter.

  1. On the Lake Formation console, choose Data filters in the navigation pane.
  2. Select your data filter and choose Edit.
    Update data filter
  3. Under Column-level access, add the new column (geo_loc).
  4. Choose Save.
    Add new column to data filter
  5. Log in to the consumer account.
  6. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate new column appears in consumer account

The new column geo_loc is visible and an additional row.

Schema evolution: Delete column

Let’s update one of the rows and delete one from the dataset shared with the consumer.

  1. Log in to the producer account.
  2. Alter the table to drop the address column from the Iceberg table. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
ALTER TABLE consumer_iceberg DROP COLUMN address;

SELECT * FROM consumer_iceberg;

Delete a column in producer account

We can observe that the column address is not present in the table.

  1. Log in to the consumer account.
  2. In the Athena query editor, run the following SELECT query on the shared table:
SELECT * FROM "lf-demo-db"."consumer_iceberg" limit 10;

Validate column deletion in consumer account

The column address is not present in the table.

Time travel

We have now changed the Iceberg table multiple times. The Iceberg table keeps track of the snapshots. Complete the following steps to explore the time travel functionality:

  1. Log in to the producer account.
  2. Query the system table:
SELECT * FROM "lf-demo-db"."consumer_iceberg$snapshots" limit 10;

We can observe that we have generated multiple snapshots.

  1. Note down one of the committed_at values to use in the next steps (for this example, 2023-01-29 21:35:02.176 UTC).
    Time travel query in consumer account
  2. Use time travel to find the table snapshot. Use the Query editor to run one query at a time. You can highlight/select one query at a time and click “Run”/“Run again:
SELECT * FROM consumer_iceberg FOR TIMESTAMP
AS OF TIMESTAMP '2023-01-29 21:35:02.176 UTC';

Find table snapshot using time travel

Clean up

Complete the following steps to avoid incurring future charges:

  1. On the Amazon S3 console, delete the table storage bucket (for this post, iceberg-athena-lakeformation-blog).
  2. In the producer account on the Athena console, run the following commands to delete the tables you created:
DROP TABLE "lf-demo-db"."consumer_iceberg";
DROP DATABASE lf-demo-db;
  1. In the producer account on the Lake Formation console, revoke permissions to the consumer account.
    Clean up - Revoke permissions to consumer account
  2. Delete the S3 bucket used for the Athena query result location from the consumer account.

Conclusion

With the support for cross account, fine-grained access control policies for formats such as Iceberg, you have the flexibility to work with any format supported by Athena. The ability to perform CRUD operations against the data in your S3 data lake combined with Lake Formation fine-grained access controls for all tables and formats supported by Athena provides opportunities to innovate and simplify your data strategy. We’d love to hear your feedback!


About the authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team at AWS. He is an Apache Iceberg Committer and PMC member.

Chris Olson is a Software Development Engineer at AWS.

Xiaoxuan Li is a Software Development Engineer at AWS.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

AWS Week in Review – March 20, 2023

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-week-in-review-march-20-2023/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

A new week starts, and Spring is almost here! If you’re curious about AWS news from the previous seven days, I got you covered.

Last Week’s Launches
Here are the launches that got my attention last week:

Picture of an S3 bucket and AWS CEO Adam Selipsky.Amazon S3 – Last week there was AWS Pi Day 2023 celebrating 17 years of innovation since Amazon S3 was introduced on March 14, 2006. For the occasion, the team released many new capabilities:

Amazon Linux 2023 – Our new Linux-based operating system is now generally available. Sébastien’s post is full of tips and info.

Application Auto Scaling – Now can use arithmetic operations and mathematical functions to customize the metrics used with Target Tracking policies. You can use it to scale based on your own application-specific metrics. Read how it works with Amazon ECS services.

AWS Data Exchange for Amazon S3 is now generally available – You can now share and find data files directly from S3 buckets, without the need to create or manage copies of the data.

Amazon Neptune – Now offers a graph summary API to help understand important metadata about property graphs (PG) and resource description framework (RDF) graphs. Neptune added support for Slow Query Logs to help identify queries that need performance tuning.

Amazon OpenSearch Service – The team introduced security analytics that provides new threat monitoring, detection, and alerting features. The service now supports OpenSearch version 2.5 that adds several new features such as support for Point in Time Search and improvements to observability and geospatial functionality.

AWS Lake Formation and Apache Hive on Amazon EMR – Introduced fine-grained access controls that allow data administrators to define and enforce fine-grained table and column level security for customers accessing data via Apache Hive running on Amazon EMR.

Amazon EC2 M1 Mac Instances – You can now update guest environments to a specific or the latest macOS version without having to tear down and recreate the existing macOS environments.

AWS Chatbot – Now Integrates With Microsoft Teams to simplify the way you troubleshoot and operate your AWS resources.

Amazon GuardDuty RDS Protection for Amazon Aurora – Now generally available to help profile and monitor access activity to Aurora databases in your AWS account without impacting database performance

AWS Database Migration Service – Now supports validation to ensure that data is migrated accurately to S3 and can now generate an AWS Glue Data Catalog when migrating to S3.

AWS Backup – You can now back up and restore virtual machines running on VMware vSphere 8 and with multiple vNICs.

Amazon Kendra – There are new connectors to index documents and search for information across these new content: Confluence Server, Confluence Cloud, Microsoft SharePoint OnPrem, Microsoft SharePoint Cloud. This post shows how to use the Amazon Kendra connector for Microsoft Teams.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
A few more blog posts you might have missed:

Example of a geospatial query.Women founders Q&A – We’re talking to six women founders and leaders about how they’re making impacts in their communities, industries, and beyond.

What you missed at that 2023 IMAGINE: Nonprofit conference – Where hundreds of nonprofit leaders, technologists, and innovators gathered to learn and share how AWS can drive a positive impact for people and the planet.

Monitoring load balancers using Amazon CloudWatch anomaly detection alarms – The metrics emitted by load balancers provide crucial and unique insight into service health, service performance, and end-to-end network performance.

Extend geospatial queries in Amazon Athena with user-defined functions (UDFs) and AWS Lambda – Using a solution based on Uber’s Hexagonal Hierarchical Spatial Index (H3) to divide the globe into equally-sized hexagons.

How cities can use transport data to reduce pollution and increase safety – A guest post by Rikesh Shah, outgoing head of open innovation at Transport for London.

For AWS open-source news and updates, here’s the latest newsletter curated by Ricardo to bring you the most recent updates on open-source projects, posts, events, and more.

Upcoming AWS Events
Here are some opportunities to meet:

AWS Public Sector Day 2023 (March 21, London, UK) – An event dedicated to helping public sector organizations use technology to achieve more with less through the current challenging conditions.

Women in Tech at Skills Center Arlington (March 23, VA, USA) – Let’s celebrate the history and legacy of women in tech.

The AWS Summits season is warming up! You can sign up here to know when registration opens in your area.

That’s all from me for this week. Come back next Monday for another Week in Review!

Danilo

Extend geospatial queries in Amazon Athena with UDFs and AWS Lambda

Post Syndicated from John Telford original https://aws.amazon.com/blogs/big-data/extend-geospatial-queries-in-amazon-athena-with-udfs-and-aws-lambda/

Amazon Athena is a serverless and interactive query service that allows you to easily analyze data in Amazon Simple Storage Service (Amazon S3) and 25-plus data sources, including on-premises data sources or other cloud systems using SQL or Python. Athena built-in capabilities include querying for geospatial data; for example, you can count the number of earthquakes in each Californian county. One disadvantage of analyzing at county-level is that it may give you a misleading impression of which parts of California have had the most earthquakes. This is because the counties aren’t equally sized; a county may have had more earthquakes simply because it’s a big county. What if we wanted a hierarchical system that allowed us to zoom in and out to aggregate data over different equally-sized geographic areas?

In this post, we present a solution that uses Uber’s Hexagonal Hierarchical Spatial Index (H3) to divide the globe into equally-sized hexagons. We then use an Athena user-defined function (UDF) to determine which hexagon each historical earthquake occurred in. Because the hexagons are equally-sized, this analysis gives a fair impression of where earthquakes tend to occur.

At the end, we’ll produce a visualization like the one below that shows the number of historical earthquakes in different areas of the western US.

H3 divides the globe into equal-sized regular hexagons. The number of hexagons depends on the chosen resolution, which may vary from 0 (122 hexagons, each with edge lengths of about 1,100 km) to 15 (569,707,381,193,162 hexagons, each with edge lengths of about 50 cm). H3 enables analysis at the area level, and each area has the same size and shape.

Solution overview

The solution extends Athena’s built-in geospatial capabilities by creating a UDF powered by AWS Lambda. Finally, we use an Amazon SageMaker notebook to run Athena queries that are rendered as a choropleth map. The following diagram illustrates this architecture.

The end-to-end architecture is as follows:

  1. A CSV file of historical earthquakes is uploaded into an S3 bucket.
  2. An AWS Glue external table is created based on the earthquake CSV.
  3. A Lambda function calculates H3 hexagons for parameters (latitude, longitude, resolution). The function is written in Java and can be called as a UDF using queries in Athena.
  4. A SageMaker notebook uses an AWS SDK for pandas package to run a SQL query in Athena, including the UDF.
  5. A Plotly Express package renders a choropleth map of the number of earthquakes in each hexagon.

Prerequisites

For this post, we use Athena to read data in Amazon S3 using the table defined in the AWS Glue Data Catalog associated with our earthquake dataset. In terms of permissions, there are two main requirements:

Configure Amazon S3

The first step is to create an S3 bucket to store the earthquake dataset, as follows:

  1. Download the CSV file of historical earthquakes from GitHub.
  2. On the Amazon S3 console, choose Buckets in the navigation pane.
  3. Choose Create bucket.
  4. For Bucket name, enter a globally unique name for your data bucket.
  5. Choose Create folder, and enter the folder name earthquakes.
  6. Upload the file to the S3 bucket. In this example, we upload the earthquakes.csv file to the earthquakes prefix.

Create a table in Athena

Navigate to Athena console to create a table. Complete the following steps:

  1. On the Athena console, choose Query editor.
  2. Select your preferred Workgroup using the drop-down menu.
  3. In the SQL editor, use the following code to create a table in the default database:
    CREATE external TABLE earthquakes
    (
      earthquake_date STRING,
      latitude DOUBLE,
      longitude DOUBLE,
      depth DOUBLE,
      magnitude DOUBLE,
      magtype STRING,
      mbstations STRING,
      gap STRING,
      distance STRING,
      rms STRING,
      source STRING,
      eventid STRING
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE LOCATION 's3://<MY-DATA-BUCKET>/earthquakes/';

Create a Lambda function for the Athena UDF

For a thorough explanation on how to build Athena UDFs, see Querying with user defined functions. We use Java 11 and Uber H3 Java binding to build the H3 UDF. We provide the implementation of the UDF on GitHub.

There are several options for deploying a UDF using Lambda. In this example, we use the AWS Management Console. For production deployments, you probably want to use infrastructure as code such as the AWS Cloud Development Kit (AWS CDK). For information about how to use the AWS CDK to deploy the Lambda function, refer to the project code repository. Another possible deployment option is using AWS Serverless Application Repository (SAR).

Deploy the UDF

Deploy the Uber H3 binding UDF using the console as follows:

  1. Go to binary directory in the GitHub repository, and download aws-h3-athena-udf-*.jar to your local desktop.
  2. Create a Lambda function called H3UDF with Runtime set to Java 11 (Corretto), and Architecture set to x86_64.
  3. Upload the aws-h3-athena-udf*.jar file.
  4. Change the handler name to com.aws.athena.udf.h3.H3AthenaHandler.
  5. In the General configuration section, choose Edit to set the memory of the Lambda function to 4096 MB, which is an amount of memory that works for our examples. You may need to set the memory size larger for your use cases.

Use the Lambda function as an Athena UDF

After you create the Lambda function, you’re ready to use it as a UDF. The following screenshot shows the function details.

You can now use the function as an Athena UDF. On the Athena console, run the following command:

USING EXTERNAL FUNCTION lat_lng_to_cell_address(lat DOUBLE, lng DOUBLE, res INTEGER)
RETURNS VARCHAR
LAMBDA '<MY-LAMBDA-ARN>'-- Replace with ARN of your Lambda function.
SELECT *,
       lat_lng_to_cell_address(latitude, longitude, 4) AS h3_cell
FROM earthquakes
WHERE latitude BETWEEN 18 AND 70;

The udf/examples folder in the GitHub repository includes more examples of the Athena queries.

Developing the UDFs

Now that we showed you how to deploy a UDF for Athena using Lambda, let’s dive deeper into how to develop these kinds of UDFs. As explained in Querying with user defined functions, in order to develop a UDF, we first need to implement a class that inherits UserDefinedFunctionHandler. Then we need to implement the functions inside the class that can be used as UDFs of Athena.

We begin the UDF implementation by defining a class H3AthenaHandler that inherits the UserDefinedFunctionHandler. Then we implement functions that act as wrappers of functions defined in the Uber H3 Java binding. We make sure that all the functions defined in the H3 Java binding API are mapped, so that they can be used in Athena as UDFs. For example, we map the lat_lng_to_cell_address function used in the preceding example to the latLngToCell of the H3 Java binding.

On top of the call to the Java binding, many of the functions in the H3AthenaHandler check whether the input parameter is null. The null check is useful because we don’t assume the input to be non-null. In practice, null values for an H3 index or address are not unusual.

The following code shows the implementation of the get_resolution function:

/** Returns the resolution of an index.
     *  @param h3 the H3 index.
     *  @return the resolution. Null when h3 is null.
     *  @throws IllegalArgumentException when index is out of range.
     */
    public Integer get_resolution(Long h3){
        final Integer result;
        if (h3 == null) {
            result = null;
        } else {
            result = h3Core.getResolution(h3);
        }
        return result;
    }

Some H3 API functions such as cellToLatLng return List<Double> of two elements, where the first element is the latitude and the second is longitude. The H3 UDF that we implement provides a function that returns well-known text (WKT) representation. For example, we provide cell_to_lat_lng_wkt, which returns a Point WKT string instead of List<Double>. We can then use the output of cell_to_lat_lng_wkt in combination with the built-in spatial Athena function ST_GeometryFromText as follows:

USING EXTERNAL FUNCTION cell_to_lat_lng_wkt(h3 BIGINT) 
RETURNS VARCHAR
LAMBDA '<MY-LAMBDA-ARN>'
SELECT ST_GeometryFromText(cell_to_lat_lng_wkt(622506764662964223))

Athena UDF only supports scalar data types and does not support nested types. However, some H3 APIs return nested types. For example, the polygonToCells function in H3 takes a List<List<List<GeoCoord>>>. Our implementation of polygon_to_cells UDF receives a Polygon WKT instead. The following shows an example Athena query using this UDF:

-- get all h3 hexagons that cover Toulouse, Nantes, Lille, Paris, Nice 
USING EXTERNAL FUNCTION polygon_to_cells(polygonWKT VARCHAR, res INT)
RETURNS ARRAY(BIGINT)
LAMBDA '<MY-LAMBDA-ARN>'
SELECT polygon_to_cells('POLYGON ((43.604652 1.444209, 47.218371 -1.553621, 50.62925 3.05726, 48.864716 2.349014, 43.6961 7.27178, 3.604652 1.444209))', 2)

Use SageMaker notebooks for visualization

A SageMaker notebook is a managed machine learning compute instance that runs a Jupyter notebook application. In this example, we will use a SageMaker notebook to write and run our code to visualize our results, but if your use case includes Apache Spark then using Amazon Athena for Apache Spark would be a great choice. For advice on security best practices for SageMaker, see Building secure machine learning environments with Amazon SageMaker. You can create your own SageMaker notebook by following these instructions:

  1. On the SageMaker console, choose Notebook in the navigation pane.
  2. Choose Notebook instances.
  3. Choose Create notebook instance.
  4. Enter a name for the notebook instance.
  5. Choose an existing IAM role or create a role that allows you to run SageMaker and grants access to Amazon S3 and Athena.
  6. Choose Create notebook instance.
  7. Wait for the notebook status to change from Creating to InService.
  8. Open the notebook instance by choosing Jupyter or JupyterLab.

Explore the data

We’re now ready to explore the data.

  1. On the Jupyter console, under New, choose Notebook.
  2. On the Select Kernel drop-down menu, choose conda_python3.
  3. Add new cells by choosing the plus sign.
  4. In your first cell, download the following Python modules that aren’t included in the standard SageMaker environment:
    !pip install geojson
    !pip install awswrangler
    !pip install geomet
    !pip install shapely

    GeoJSON is a popular format for storing spatial data in a JSON format. The geojson module allows you to easily read and write GeoJSON data with Python. The second module we install, awswrangler, is the AWS SDK for pandas. This is a very easy way to read data from various AWS data sources into Pandas data frames. We use it to read earthquake data from the Athena table.

  5. Next, we import all the packages that we use to import the data, reshape it, and visualize it:
    from geomet import wkt
    import plotly.express as px
    from shapely.geometry import Polygon, mapping
    import awswrangler as wr
    import pandas as pd
    from shapely.wkt import loads
    import geojson
    import ast

  6. We begin importing our data using the athena.read_sql._query function in AWS SDK for pandas. The Athena query has a subquery that uses the UDF to add a column h3_cell to each row in the earthquakes table, based on the latitude and longitude of the earthquake. The analytic function COUNT is then used to find out the number of earthquakes in each H3 cell. For this visualization, we’re only interested in earthquakes within the US, so we filter out rows in the data frame that are outside the area of interest:
    def run_query(lambda_arn, db, resolution):
        query = f"""USING EXTERNAL FUNCTION cell_to_boundary_wkt(cell VARCHAR)
                        RETURNS ARRAY(VARCHAR)
                        LAMBDA '{lambda_arn}'
                           SELECT h3_cell, cell_to_boundary_wkt(h3_cell) as boundary, quake_count FROM(
                            USING EXTERNAL FUNCTION lat_lng_to_cell_address(lat DOUBLE, lng DOUBLE, res INTEGER)
                             RETURNS VARCHAR
                            LAMBDA '{lambda_arn}'
                        SELECT h3_cell, COUNT(*) AS quake_count
                          FROM
                            (SELECT *,
                               lat_lng_to_cell_address(latitude, longitude, {resolution}) AS h3_cell
                             FROM earthquakes
                             WHERE latitude BETWEEN 18 AND 70        -- For this visualisation, we're only interested in earthquakes within the USA.
                               AND longitude BETWEEN -175 AND -50
                             )
                           GROUP BY h3_cell ORDER BY quake_count DESC) cell_quake_count"""
        return wr.athena.read_sql_query(query, database=db)
    
    lambda_arn = '<MY-LAMBDA-ARN>' # Replace with ARN of your lambda.
    db_name = '<MY-DATABASE-NAME>' # Replace with name of your Glue database.
    earthquakes_df = run_query(lambda_arn=lambda_arn,db=db_name, resolution=4)
    earthquakes_df.head()

    The following screenshot shows our results.

Follow along with the rest of the steps in our Jupyter notebook to see how we analyze and visualize our example with H3 UDF data.

Visualize the results

To visualize our results, we use the Plotly Express module to create a choropleth map of our data. A choropleth map is a type of visualization that is shaded based on quantitative values. This is a great visualization for our use case because we’re shading different regions based on the frequency of earthquakes.

In the resulting visual, we can see the ranges of frequency of earthquakes in different areas of North America. Note, the H3 resolution in this map is lower than in the earlier map, which makes each hexagon cover a larger area of the globe.

Clean up

To avoid incurring extra charges on your account, delete the resources you created:

  1. On the SageMaker console, select the notebook and on the Actions menu, choose Stop.
  2. Wait for the status of the notebook to change to Stopped, then select the notebook again and on the Actions menu, choose Delete.
  3. On the Amazon S3 console, select the bucket you created and choose Empty.
  4. Enter the bucket name and choose Empty.
  5. Select the bucket again and choose Delete.
  6. Enter the bucket name and choose Delete bucket.
  7. On the Lambda console, select the function name and on the Actions menu, choose Delete.

Conclusion

In this post, you saw how to extend functions in Athena for geospatial analysis by adding your own user-defined function. Although we used Uber’s H3 geospatial index in this demonstration, you can bring your own geospatial index for your own custom geospatial analysis.

In this post, we used Athena, Lambda, and SageMaker notebooks to visualize the results of our UDFs in the western US. Code examples are in the h3-udf-for-athena GitHub repo.

As a next step, you can modify the code in this post and customize it for your own needs to gain further insights from your own geographical data. For example, you could visualize other cases such as droughts, flooding, and deforestation.


About the Authors

John Telford is a Senior Consultant at Amazon Web Services. He is a specialist in big data and data warehouses. John has a Computer Science degree from Brunel University.

Anwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Pauline Ting is a Data Scientist in the AWS Professional Services team. She supports customers in achieving and accelerating their business outcome by developing sustainable AI/ML solutions. In her spare time, Pauline enjoys traveling, surfing, and trying new dessert places.

Build a serverless transactional data lake with Apache Iceberg, Amazon EMR Serverless, and Amazon Athena

Post Syndicated from Houssem Chihoub original https://aws.amazon.com/blogs/big-data/build-a-serverless-transactional-data-lake-with-apache-iceberg-amazon-emr-serverless-and-amazon-athena/

Since the deluge of big data over a decade ago, many organizations have learned to build applications to process and analyze petabytes of data. Data lakes have served as a central repository to store structured and unstructured data at any scale and in various formats. However, as data processing at scale solutions grow, organizations need to build more and more features on top of their data lakes. One important feature is to run different workloads such as business intelligence (BI), Machine Learning (ML), Data Science and data exploration, and Change Data Capture (CDC) of transactional data, without having to maintain multiple copies of data. Additionally, the task of maintaining and managing files in the data lake can be tedious and sometimes complex.

Table formats like Apache Iceberg provide solutions to these issues. They enable transactions on top of data lakes and can simplify data storage, management, ingestion, and processing. These transactional data lakes combine features from both the data lake and the data warehouse. You can simplify your data strategy by running multiple workloads and applications on the same data in the same location. However, using these formats requires building, maintaining, and scaling infrastructure and integration connectors that can be time-consuming, challenging, and costly.

In this post, we show how you can build a serverless transactional data lake with Apache Iceberg on Amazon Simple Storage Service (Amazon S3) using Amazon EMR Serverless and Amazon Athena. We provide an example for data ingestion and querying using an ecommerce sales data lake.

Apache Iceberg overview

Iceberg is an open-source table format that brings the power of SQL tables to big data files. It enables ACID transactions on tables, allowing for concurrent data ingestion, updates, and queries, all while using familiar SQL. Iceberg employs internal metadata management that keeps track of data and empowers a set of rich features at scale. It allows you to time travel and roll back to old versions of committed data transactions, control the table’s schema evolution, easily compact data, and employ hidden partitioning for fast queries.

Iceberg manages files on behalf of the user and unlocks use cases such as:

  • Concurrent data ingestion and querying, including streaming and CDC
  • BI and reporting with expressive simple SQL
  • Empowering ML feature stores and training sets
  • Compliance and regulations workloads, such as GDPR find and forget
  • Reinstating late-arriving data, which is dimensions data arriving later than the fact data. For example, the reason for a flight delay may arrive well after the fact that the fligh is delayed.
  • Tracking data changes and rollback

Build your transactional data lake on AWS

You can build your modern data architecture with a scalable data lake that integrates seamlessly with an Amazon Redshift powered cloud warehouse. Moreover, many customers are looking for an architecture where they can combine the benefits of a data lake and a data warehouse in the same storage location. In the following figure, we show a comprehensive architecture that uses the modern data architecture strategy on AWS to build a fully featured transactional data lake. AWS provides flexibility and a wide breadth of features to ingest data, build AI and ML applications, and run analytics workloads without having to focus on the undifferentiated heavy lifting.

Data can be organized into three different zones, as shown in the following figure. The first zone is the raw zone, where data can be captured from the source as is. The transformed zone is an enterprise-wide zone to host cleaned and transformed data in order to serve multiple teams and use cases. Iceberg provides a table format on top of Amazon S3 in this zone to provide ACID transactions, but also to allow seamless file management and provide time travel and rollback capabilities. The business zone stores data specific to business cases and applications aggregated and computed from data in the transformed zone.

One important aspect to a successful data strategy for any organization is data governance. On AWS, you can implement a thorough governance strategy with fine-grained access control to the data lake with AWS Lake Formation.

Serverless architecture overview

In this section, we show you how to ingest and query data in your transactional data lake in a few steps. EMR Serverless is a serverless option that makes it easy for data analysts and engineers to run Spark-based analytics without configuring, managing, and scaling clusters or servers. You can run your Spark applications without having to plan capacity or provision infrastructure, while paying only for your usage. EMR Serverless supports Iceberg natively to create tables and query, merge, and insert data with Spark. In the following architecture diagram, Spark transformation jobs can load data from the raw zone or source, apply the cleaning and transformation logic, and ingest data in the transformed zone on Iceberg tables. Spark code can run instantaneously on an EMR Serverless application, which we demonstrate later in this post.

The Iceberg table is synced with the AWS Glue Data Catalog. The Data Catalog provides a central location to govern and keep track of the schema and metadata. With Iceberg, ingestion, update, and querying processes can benefit from atomicity, snapshot isolation, and managing concurrency to keep a consistent view of data.

Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data where it lives. To serve BI and reporting analysis, it allows you to build and run queries on Iceberg tables natively and integrates with a variety of BI tools.

Sales data model

Star schema and its variants are very popular for modeling data in data warehouses. They implement one or more fact tables and dimension tables. The fact table stores the main transactional data from the business logic with foreign keys to dimensional tables. Dimension tables hold additional complementary data to enrich the fact table.

In this post, we take the example of sales data from the TPC-DS benchmark. We zoom in on a subset of the schema with the web_sales fact table, as shown in the following figure. It stores numeric values about sales cost, ship cost, tax, and net profit. Additionally, it has foreign keys to dimensional tables like date_dim, time_dim, customer, and item. These dimensional tables store records that give more details. For instance, you can show when a sale took place by which customer for which item.

Dimension-based models have been used extensively to build data warehouses. In the following sections, we show how to implement such a model on top of Iceberg, providing data warehousing features on top of your data lake, and run different workloads in the same location. We provide a complete example of building a serverless architecture with data ingestion using EMR Serverless and Athena using TPC-DS queries.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Basic knowledge about data management and SQL

Deploy solution resources with AWS CloudFormation

We provide an AWS CloudFormation template to deploy the data lake stack with the following resources:

  • Two S3 buckets: one for scripts and query results, and one for the data lake storage
  • An Athena workgroup
  • An EMR Serverless application
  • An AWS Glue database and tables on external public S3 buckets of TPC-DS data
  • An AWS Glue database for the data lake
  • An AWS Identity and Access Management (IAM) role and polices

Complete the following steps to create your resources:

  1. Launch the CloudFormation stack:

Launch Button

This automatically launches AWS CloudFormation in your AWS account with the CloudFormation template. It prompts you to sign in as needed.

  1. Keep the template settings as is.
  2. Check the I acknowledge that AWS CloudFormation might create IAM resources box.
  3. Choose Submit

When the stack creation is complete, check the Outputs tab of the stack to verify the resources created.

Upload Spark scripts to Amazon S3

Complete the following steps to upload your Spark scripts:

  1. Download the following scripts: ingest-iceberg.py and update-item.py.
  2. On the Amazon S3 console, go to the datalake-resources-<AccountID>-us-east-1 bucket you created earlier.
  3. Create a new folder named scripts.
  4. Upload the two PySpark scripts: ingest-iceberg.py and update-item.py.

Create Iceberg tables and ingest TPC-DS data

To create your Iceberg tables and ingest the data, complete the following steps:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Manage applications.
  3. Choose the application datalake-app.

  1. Choose Start application.

Once started, it will provision the pre-initialized capacity as configured at creation (one Spark driver and two Spark executors). The pre-initialized capacity are resources that will be provisioned when you start your application. They can be used instantly when you submit jobs. However, they incur charges even if they’re not used when the application is in a started state. By default, the application is set to stop when idle for 15 minutes.

Now that the EMR application has started, we can submit the Spark ingest job ingest-iceberg.py. The job creates the Iceberg tables and then loads data from the previously created AWS Glue Data Catalog tables on TPC-DS data in an external bucket.

  1. Navigate to the datalake-app.
  2. On the Job runs tab, choose Submit job.

  1. For Name, enter ingest-data.
  2. For Runtime role, choose the IAM role created by the CloudFormation stack.
  3. For Script location, enter the S3 path for your resource bucket (datalake-resource-<####>-us-east-1>scripts>ingest-iceberg.py).

  1. Under Spark properties, choose Edit in text.
  2. Enter the following properties, replacing <BUCKET_NAME> with your data lake bucket name datalake-<####>-us-east-1 (not datalake-resources)
--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=8g --conf spark.executor.instances=2 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.dev.warehouse=s3://<BUCKET_NAME>/warehouse --conf spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.dev.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myIcebergLockTab --conf spark.dynamicAllocation.maxExecutors=8 --conf spark.driver.maxResultSize=1G --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. Submit the job.

You can monitor the job progress.

Query Iceberg tables

In this section, we provide examples of data warehouse queries from TPC-DS on the Iceberg tables.

  1. On the Athena console, open the query editor.
  2. For Workgroup, switch to DatalakeWorkgroup.

  1. Choose Acknowledge.

The queries in DatalakeWorkgroup will run on Athena engine version 3.

  1. On the Saved queries tab, choose a query to run on your Iceberg tables.

The following queries are listed:

  • Query3 – Report the total extended sales price per item brand of a specific manufacturer for all sales in a specific month of the year.
  • Query45 – Report the total web sales for customers in specific zip codes, cities, counties, or states, or specific items for a given year and quarter.
  • Query52 – Report the total of extended sales price for all items of a specific brand in a specific year and month.
  • Query6 – List all the states with at least 10 customers who during a given month bought items with the price tag at least 20% higher than the average price of items in the same category.
  • Query75 – For 2 consecutive years, track the sales of items by brand, class, and category.
  • Query86a – Roll up the web sales for a given year by category and class, and rank the sales among peers within the parent. For each group, compute the sum of sales and location with the hierarchy and rank within the group.

These queries are examples of queries used in decision-making and reporting in an organization. You can run them in the order you want. For this post, we start with Query3.

  1. Before you run the query, confirm that Database is set to datalake.

  1. Now you can run the query.

  1. Repeat these steps to run the other queries.

Update the item table

After running the queries, we prepare a batch of updates and inserts of records into the item table.

  1. First, run the following query to count the number of records in the item Iceberg table:
SELECT count(*) FROM "datalake"."item_iceberg";

This should return 102,000 records.

  1. Select item records with a price higher than $90:
SELECT count(*) FROM "datalake"."item_iceberg" WHERE i_current_price > 90.0;

This will return 1,112 records.

The update-item.py job takes these 1,112 records, modifies 11 records to change the name of the brand to Unknown, and changes the remaining 1,101 records’ i_item_id key to flag them as new records. As a result, a batch of 11 updates and 1,101 inserts are merged into the item_iceberg table.

The 11 records to be updated are those with price higher than $90, and the brand name starts with corpnameless.

  1. Run the following query:
SELECT count(*) FROM "datalake"."item_iceberg" WHERE i_current_price > 90.0 AND i_brand LIKE 'corpnameless%';

The result is 11 records. The item_update.py job replaces the brand name with Unknown and merges the batch into the Iceberg table.

Now you can return to the EMR Serverless console and run the job on the EMR Serverless application.

  1. On the application details page, choose Submit job.
  2. For Name, enter update-item-job.
  3. For Runtime role¸ use the same role that you used previously.
  4. For S3 URI, enter the update-item.py script location.

  1. Under Spark properties, choose Edit in text.
  2. Enter the following properties, replacing the <BUCKET-NAME> with your own datalake-<####>-us-east-1:
--conf spark.executor.cores=2 --conf spark.executor.memory=8g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=2 --conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.dev.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myIcebergLockTab --conf spark.dynamicAllocation.maxExecutors=4 --conf spark.driver.maxResultSize=1G --conf spark.sql.catalog.dev.warehouse=s3://<BUCKET-NAME>/warehouse --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

  1. Then submit the job.

  1. After the job finishes successfully, return to the Athena console and run the following query:
SELECT count(*) FROM "datalake"."item_iceberg";

The returned result is 103,101 = 102,000 + (1,112 – 11). The batch was merged successfully.

Time travel

To run a time travel query, complete the following steps:

  1. Get the timestamp of the job run via the application details page on the EMR Serverless console, or the Spark UI on the History Server, as shown in the following screenshot.

This time could be just minutes before you ran the update Spark job.

  1. Convert the timestamp from the format YYYY/MM/DD hh:mm:ss to YYYY-MM-DDThh:mm:ss.sTZD with time zone. For example, from 2023/02/20 14:40:41 to 2023-02-20 14:40:41.000 UTC.
  2. On the Athena console, run the following query to count the item table records at a time before the update job, replacing <TRAVEL_TIME> with your time:
SELECT count(*) FROM "datalake"."item_iceberg" FOR TIMESTAMP AS OF TIMESTAMP <TRAVEL_TIME>;

The query will give 102,000 as a result, the expected table size before running the update job.

  1. Now you can run a query with a timestamp after the successful run of the update job (for example, 2023-02-20 15:06:00.000 UTC):
SELECT count(*) FROM "datalake"."item_iceberg" FOR TIMESTAMP AS OF TIMESTAMP <TRAVEL_TIME>;

The query will now give 103,101 as the size of the table at that time, after the update job successfully finished.

Additionally, you can query in Athena based on the version ID of a snapshot in Iceberg. However, for more advanced use cases, such as to roll back to a given version or to find version IDs, you can use Iceberg’s SDK or Spark on Amazon EMR.

Clean up

Complete the following steps to clean up your resources:

  1. On the Amazon S3 console, empty your buckets.
  2. On the Athena console, delete the workgroup DatalakeWorkgroup.
  3. On the EMR Studio console, stop the application datalake-app.
  4. On the AWS CloudFormation console, delete the CloudFormation stack.

Conclusion

In this post, we created a serverless transactional data lake with Iceberg tables, EMR Serverless, and Athena. We used TPC-DS sales data with 10 GB data and more than 7 million records in the fact table. We demonstrated how straightforward it is to rely on SQL and Spark to run serverless jobs for data ingestion and upserts. Moreover, we showed how to run complex BI queries directly on Iceberg tables from Athena for reporting.

You can start building your serverless transactional data lake on AWS today, and dive deep into the features and optimizations Iceberg provides to build analytics applications more easily. Iceberg can also help you in the future to improve performance and reduce costs.


About the Author

Houssem is a Specialist Solutions Architect at AWS with a focus on analytics. He is passionate about data and emerging technologies in analytics. He holds a PhD on data management in the cloud. Prior to joining AWS, he worked on several big data projects and published several research papers in international conferences and venues.

Improve productivity by using keyboard shortcuts in Amazon Athena query editor

Post Syndicated from Naresh Gautam original https://aws.amazon.com/blogs/big-data/improve-productivity-by-using-keyboard-shortcuts-in-amazon-athena-query-editor/

Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data where it lives. You can analyze data or build applications from an Amazon Simple Storage Service (Amazon S3) data lake and over 25 data sources, including on-premises data sources or other cloud systems using SQL or Python. Athena is built on open-source Trino and Presto engines and Apache Spark frameworks, with no provisioning or configuration effort required.

Different types of users rely on Athena, including business analysts, data scientists, security, and operations engineers. Athena provides a query editor to enter and run queries on data using structured query language (SQL). The query editor provides features like run, cancel, and save queries or statements. Additionally, it provides keyboard shortcuts for user-friendly operation.

This post discusses the keyboard shortcuts available and how you can use them.

Accessing the Athena console

If you’re new to Athena and don’t know how to access the Athena console and run queries and statements, refer to the following getting started tutorial. This tutorial walks you through using Athena to query data. You’ll create a table based on sample data stored in Amazon S3, query the table, and check the results of the query.

Keyboard shortcuts

The query editor provides keyboard shortcuts for different action types like running a query, formatting a query, line operations, selection, multi-cursor, go to, find/replace, and folding. Compared to reaching for the mouse or navigating a menu, a single keyboard shortcut saves a moment of your time.

With keyboard shortcuts, you can use key combinations to edit your SQL statement without using a mouse. For example, you can use multiple cursors in your editing window for selecting all instances of text you wish to edit, and edit your text, fold or unfold selected text, find and replace text, and perform line operations like remove line, move lines, and more.

You can also find these keyboard shortcuts on the query editor on the bottom right corner, as highlighted in the following screenshot.

The following table shows the keyboards shortcuts for Window/Linux and Mac.

Action Type Action Windows/Linux Mac
Other Execute query Ctrl-Enter Cmd-Enter, Ctrl-Enter
Other Format query Ctrl-Alt-L Opt-Cmd-L
Other Previous query Ctrl-Up Ctrl-Shift-Up
Other Next query Ctrl-Down Ctrl-Shift-Down
Other Close tab Alt-X Opt-X
Other Previous tab Ctrl-, Ctrl-,
Other Next tab Ctrl-. Ctrl-.
Other Indent Tab Tab
Other Outdent Shift-Tab Shift-Tab
Other Save Ctrl-S Cmd-S
Other Undo Ctrl-Z Cmd-Z
Other Redo Ctrl-Shift-Z, Ctrl-Y Cmd-Shift-Z, Cmd-Y
Other Toggle comment Ctrl-/ Cmd-/
Other Transpose letters Ctrl-T Ctrl-T
Other Change to lower case Ctrl-Shift-U Ctrl-Shift-U
Other Change to upper case Ctrl-U Ctrl-U
Other Overwrite Insert Insert
Other Delete Delete
Line Operations Remove line Ctrl-D Cmd-D
Line Operations Copy lines down Alt-Shift-Down Cmd-Opt-Down
Line Operations Copy lines up Alt-Shift-Up Cmd-Opt-Up
Line Operations Move lines down Alt-Down Opt-Down
Line Operations Move lines up Alt-Up Opt-Up
Line Operations Remove to line end Alt-Delete Ctrl-K
Line Operations Remove to line start Alt-Backspace Cmd-Backspace
Line Operations Remove word left Ctrl-Backspace Opt-Backspace, Ctrl-Opt-Backspace
Line Operations Remove word right Ctrl-Delete Opt-Delete
Line Operations Split line Ctrl-O
Selection Select all Ctrl-A Cmd-A
Selection Select left Shift-Left Shift-Left
Selection Select right Shift-Right Shift-Right
Selection Select word left Ctrl-Shift-Left Opt-Shift-Left
Selection Select word right Ctrl-Shift-Right Opt-Shift-Right
Selection Select line start Shift-Home Shift-Home
Selection Select line end Shift-End Shift-End
Selection Select to line end Alt-Shift-Right Cmd-Shift-Right
Selection Select to line start Alt-Shift-Left Cmd-Shift-Left
Selection Select up Shift-Up Shift-Up
Selection Select down Shift-Down Shift-Down
Selection Select page up Shift-PageUp Shift-PageUp
Selection Select page down Shift-PageDown Shift-PageDown
Selection Select to start Ctrl-Shift-Home Cmd-Shift-Up
Selection Select to end Ctrl-Shift-End Cmd-Shift-Down
Selection Duplicate selection Ctrl-Shift-D Cmd-Shift-D
Selection Select to matching bracket Ctrl-Shift-P
Multicursor Add multi-cursor above Ctrl-Alt-Up Ctrl-Opt-Up
Multicursor Add multi-cursor below Ctrl-Alt-Down Ctrl-Opt-Down
Multicursor Add next occurrence to multi-selection Ctrl-Alt-Right Ctrl-Opt-Right
Multicursor Add previous occurrence to multi-selection Ctrl-Alt-Left Ctrl-Opt-Left
Multicursor Move multi-cursor from current line to the line above Ctrl-Alt-Shift-Up Ctrl-Opt-Shift-Up
Multicursor Move multi-cursor from current line to the line below Ctrl-Alt-Shift-Down Ctrl-Opt-Shift-Down
Multicursor Remove current occurrence from multi-selection and move to next Ctrl-Alt-Shift-Right Ctrl-Opt-Shift-Right
Multicursor Remove current occurrence from multi-selection and move to previous Ctrl-Alt-Shift-Left Ctrl-Opt-Shift-Left
Multicursor Select all from multi-selection Ctrl-Shift-L Ctrl-Shift-L
Go to Go to left Left Left, Ctrl-B
Go to Go to right Right Right, Ctrl-F
Go to Go to word left Ctrl-Left Opt-Left
Go to Go to word right Ctrl-Right Opt-Right
Go to Go line up Up Up, Ctrl-P
Go to Go line down Down Down, Ctrl-N
Go to Go to line start Alt-Left, Home Cmd-Left, Home, Ctrl-A
Go to Go to line end Alt-Right, End Cmd-Right, End, Ctrl-E
Go to Go to page up PageUp Opt-PageUp
Go to Go to page down PageDown Opt-PageDown, Ctrl-V
Go to Go to start Ctrl-Home Cmd-Home, Cmd-Up
Go to Go to end Ctrl-End Cmd-End, Cmd-Down
Go to Scroll line down Ctrl-Down Cmd-Down
Go to Scroll line up Ctrl-Up
Go to Go to matching bracket Ctrl-P
Go to Scroll page down Opt-PageDown
Go to Scroll page up Opt-PageUp
Find/Replace Find Ctrl-F Cmd-F
Find/Replace Replace Ctrl-H Cmd-Opt-F
Find/Replace Find next Ctrl-K Cmd-G
Find/Replace Find previous Ctrl-Shift-K Cmd-Shift-G
Folding Fold selection Alt-L, Ctrl-F1 Cmd-Opt-L, Cmd-F1
Folding Unfold Alt-Shift-L, Ctrl-Shift-F1 Cmd-Opt-Shift-L, Cmd-Shift-F1
Folding Unfold all Alt-Shift-0 Cmd-Opt-Shift-0
Other Autocomplete Ctrl-Space Ctrl-Space
Other Focus out Esc Esc

For illustration, you can perform the Format query action by using the keyboard shortcut (Ctrl-Alt-L for Windows/Linux, Opt-Cmd-L for Mac). It converts unformatted SQL to a well-formatted SQL, as shown in the following screenshots.

Similarly, you can try out the Toggle comment command (Ctrl-/ for Windows/Linux, Cmd-/ for Mac) to comment or uncomment lines of SQL in the Athena query editor. This comes in very handy when you want to quickly comment out specific lines in your query, as shown in the following screenshots.

You can do line operations like Remove line, Copy lines down, Copy lines up, and more. The following screenshots show an example of the Remove line action (Ctrl-D for Windows/Linux, Cmd-D for Mac).

You can do a line selection like Select all, Select left, Select line start, and more. The following screenshots show an example the Select all action (Ctrl-A for Windows/Linux, Cmd-A for Mac).

You can do multi-cursor actions like Add multi-cursor above, Add multi-cursor below, Add next occurrence to multi-selection, Add previous occurrence to multi-selection, Move multi-cursor from current line to the line above, and more. The following example is of the Add multi-cursor above action (Ctrl-Alt-Up for Windows/Linux, Ctrl-Opt-Up for Mac).

You can do go to actions like Go to left, Go to right, Go to word left, and more. The following is an example of the Go to left action (Ctrl-B).

You can do find and replace actions like Find, Replace, Find next, and more. The following is an example of the Replace action (Ctrl-H for Windows/Linux, Cmd-Opt-F for Mac).

You can also do folding actions like Fold selection, Unfold, and Unfold all. The following example is of the Unfold action (Alt-Shift-L or Ctrl-Shift-F1 for Windows/Linux, Cmd-Opt-Shift-L or Cmd-Shift-F1 for Mac).

Conclusion

In this post, we saw how Athena provides an array of native options to help you improve productivity when analyzing your data. You can go to the Athena console and start running SQL statements or querying data using the built-in query editor. The query editor provides key shortcuts to improve your productivity by using key combinations to edit SQL statements, instead of using a mouse.

If you have any questions or suggestions, please leave a comment.


About the Authors

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Srikanth Sopirala is a Principal Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family, and road biking.

Harsh Vardhan is an AWS Solutions Architect, specializing in analytics. He has over 5 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Access Amazon Athena in your applications using the WebSocket API

Post Syndicated from Abhi Sodhani original https://aws.amazon.com/blogs/big-data/access-amazon-athena-in-your-applications-using-the-websocket-api/

Modern applications are built with modular independent components or microservices that rely on an API framework to communicate with services. Many organizations are building data lakes to store and analyze large volumes of structured, semi-structured, and unstructured data. In addition, many teams are moving towards a data mesh architecture, which requires them to expose their data sets as easily consumable data products. To accomplish this on AWS, organizations use Amazon Simple Storage Service (Amazon S3) to provide cheap and reliable object storage to house their datasets. To enable interactive querying and analyzing their data in place using familiar SQL syntax, many teams are turning to Amazon Athena. Athena is an interactive query service that is used by modern applications to query large volumes of data on an S3 data lake using standard SQL.

When working with SQL databases, application developers and business analysts are most familiar with simple permissions management and synchronous query-response protocols—if a user has permissions to submit a query, they do so and receive the results from the server when the query is complete. Directly accessing Athena APIs, for example when integrating with a custom web application, requires an AWS Identity and Access Management (IAM) role for the applications, and requires you to build a custom process to poll for query completion asynchronously. The IAM role needs access to run Athena API calls, as well as S3 permissions to retrieve the Athena output stored on Amazon S3. Polling for Athena query completion when performed at several intervals could result in increased latency from the client perspective.

In this post, we present a solution that can integrate with your front-end application to query data from Amazon S3 using an Athena synchronous API invocation. With this solution, you can add a layer of abstraction to your application on direct Athena API calls and promote the access using the WebSocket API developed with Amazon API Gateway. The query results are returned back to the application as Amazon S3 presigned URLs.

Overview of solution

For illustration purposes, this post builds a COVID-19 data lake with a WebSocket API to handle Athena queries. Your application can invoke the WebSocket API to pull the data from Amazon S3 using an Athena SQL query, and the WebSocket API returns the JSON response with the presigned Amazon S3 URL. The application needs to parse the JSON message to read the presigned URL, download the data to local, and report the data back to the front end.

We use AWS Step Functions to poll the Athena query run. When the query is complete, Step Functions invokes an AWS Lambda function to generate the presigned URL and send the request back to the application.

The application doesn’t require direct access to Athena, just access to invoke the API. When using this solution, you should secure the API following AWS guidelines. For more information, refer to Controlling and managing access to a WebSocket API in API Gateway.

The following diagram summarizes the architecture, key components, and interactions in the solution.

Architecture diagram for the Athena WebSocket API. The user connects to the API through API Gateway. API Gateway uses Lambda and DynamoDB to store session data. SQL queries are routed to Amazon Athena and a Step Function polls for query status and returns the results back to the user.

The application is composed of the WebSocket API in API Gateway, which handles the connectivity between the client and Athena. A client application using the framework can submit the Athena SQL query and get back the presigned URL containing the query results data. The workflow includes the following steps:

  1. The application invokes the WebSocket API connection.
  2. A Lambda function is invoked to initiate the connection. The connection ID is stored in an Amazon DynamoDB
  3. When the client application is connected, it can invoke the runquery action, which invokes the RunQuery Lambda function.
  4. The function first runs the Athena query.
  5. When the query is started, the function checks the status and uses Step Functions to track the query progress.
  6. Step Functions invokes the third Lambda function to read the processed Athena results and get the presigned S3 URL. Failed messages are routed to an Amazon Simple Notification Service (Amazon SNS) topic, which you can subscribe to.
  7. The presigned URL is returned to the client application.
  8. The connection is closed using the OnDisconnect function.

The RunQuery Lambda function runs the Athena query using the start_query_execution request:

def run_query(client, query):
    """This function executes and sends the query request to Athena."""
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': params['Database']
        },
        ResultConfiguration={
            'OutputLocation': f's3://{params["BucketName"]}/{params["OutputDir"]}/'
        },
        WorkGroup=params["WorkGroup"]
    )
    return response

The Amazon S3 presigned URL is generated by invoking the generate_presigned_url request with the bucket and key information that hosts the Athena results. The code hard codes the presigner expiration to 120 seconds, which is configurable in the function input parameter PreSignerExpireSeconds. See the following code:

def signed_get_url(event):
    s3 = boto3.client('s3', region_name=params['Region'], config=Config(signature_version='s3v4'))
    # User provided body with object info
    bodyData = json.loads(event['body'])
    try:
        url = s3.generate_presigned_url(
            ClientMethod='get_object',
            Params={
                'Bucket': params['BucketName'],
                'Key': bodyData["ObjectName"]
            },
            ExpiresIn=int(params['PreSignerExpireSeconds'])
        )
        body = {'PreSignedUrl': url, 'ExpiresIn': params['PreSignerExpireSeconds']}
        response = {
            'statusCode': 200,
			'body': json.dumps(body),
            'headers': cors.global_returns["Allow Origin Header"]
        }
        logger.info(f"[MESSAGE] Response for PreSignedURL: {response}")
    except Exception as e:
        logger.exception(f"[MESSAGE] Unable to generate URL: {str(e)}")
        response = {
            'statusCode': 502,
            'body': 'Unable to generate PreSignedUrl',
            'headers': cors.global_returns["Allow Origin Header"]
        }
    return response

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • Permissions to create an AWS CloudFormation stack
  • Permissions to create the following resources:
    • AWS Glue catalog databases and tables
    • API Gateway
    • Lambda function
    • IAM roles
    • Step Functions state machine
    • SNS topic
    • DynamoDB table

Enable the WebSocket API

To enable the WebSocket API of API Gateway, complete the following steps:

  1. Configure the Athena dataset.

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. If you’re signed in to your AWS account, the following page fills out most of the stack creation form for you. All you need to do is choose Create stack. For instructions on creating a CloudFormation stack, see Getting started with AWS CloudFormation.

You can also use an existing Athena database to query, in which case you need to update the stack parameters.

  1. Sign in to the Athena console.

If this is the first time you’re using Athena, you must specify a query result location on Amazon S3. For more information about querying and accessing the data from Athena, see A public data lake for analysis of COVID-19 data.

  1. Configure the WebSocket framework using the following page, which deploys the API infrastructure using AWS Serverless Application Model (AWS SAM).
  2. Update the parameters pBucketName with the S3 bucket (in the us-east-2 region) that stores the Athena results and also update the database if you want to query an existing database.
  3. Select the check box to acknowledge creation of IAM roles and choose Deploy.

At a high level, these are the primary resources deployed by the application template:

  • An API Gateway with routes to the connect, disconnect, and query Lambda functions. Note that the API Gateway deployed with this sample doesn’t implement authentication and authorization. We recommend that you implement authentication and authorization before deploying into a production environment. Refer to Controlling and managing access to a WebSocket API in API Gateway to understand how to implement these security controls.
  • A DynamoDB table for tracking client connections.
  • Lambda functions to manage connection states using DynamoDB.
  • A Lambda function to run the query and start the step function. The function includes an associated IAM role and policies with permissions to Step Functions, the AWS Glue Data Catalog, Athena, AWS Key Management Service (AWS KMS), and Amazon S3. Note that the Lambda execution role gives read access to the Data Catalog and S3 bucket that you specify in the deployment parameters. We recommend that you don’t include a catalog that contains sensitive data without first understanding the impacts and implementing additional security controls.
  • A Lambda function with associated permissions to poll for the query results and return the presigned URL to the client.
  • A Step Functions state machine with associated permissions to run the polling Lambda function and send API notifications using Amazon SNS.

Test the setup

To test the WebSocket API, you can use wscat, an open-source command line tool.

  1. Install NPM.
  2. Install wscat:
$ npm install -g wscat
  1. On the console, connect to your published API endpoint by running the following command. The full URI to use can be found on the AWS CloudFormation console by finding the WebSocketURI output in the serverlessrepo-aws-app-athena-websocket-integration stack that was deployed by the AWS SAM application you deployed previously.
$ wscat -c wss://{YOUR-API-ID}.execute-api.{YOUR-REGION}.amazonaws.com/{STAGE}
  1. To test the runquery function, send a JSON message like the following example. This triggers the state machine to run your SQL query using Athena and, using Lambda, return an S3 presigned URL to your client, which you can access to download the query results. Note that the API accepts any valid Athena query. Additional query validation could be added to the internal Lambda function if desired.
$ wscat -c wss://{YOUR-API-ID}.execute-api.{YOUR-REGION}.amazonaws.com/{STAGE}
Connected (press CTRL+C to quit)
> {"action":"runquery", "data":"SELECT * FROM \"covid-19\".country_codes limit 5"}
< {"pre-signed-url": "https://xxx-s3.amazonaws.com/athena_api_access_results/xxxxx.csv?"}
  1. Copy the value for pre-signed-url and enter it into your browser window to access the results.

The presigned URL provides you temporary credentials to download the query results. For more information, refer to Using presigned URLs. This process can be integrated into a front-end web application to automatically download and display the results of the query.

Clean up

To avoid incurring ongoing charges, delete the resources you provisioned by deleting the CloudFormation stacks CovidLakeStacks and serverlessrepo-AthenaWebSocketIntegration via the AWS CloudFormation console. For detailed instructions, refer to the cleanup sections in the starter kit README files in the GitHub repo.

Conclusion

In this post, we showed how to integrate your application with Athena using the WebSocket API. We have included a GitHub repo for you to understand the code and modify it per your application requirements, to get the full benefits of the solution. We encourage you to further explore the features of the API Gateway WebSocket API to add in security using authorizers, view live invocations using dashboards, and expand the framework for more routes on action request.

Let’s stay in touch via the GitHub repo.


About the Authors

Abhi Sodhani is a Sr. AI/ML Solutions Architect at AWS. He helps customers with a wide range of solutions, including machine leaning, artificial intelligence, data lakes, data warehousing, and data visualization. Outside of work, he is passionate about books, yoga, and travel.

Robin Zimmerman's HeadshotRobin Zimmerman is a Data and ML Engineer with AWS Professional Services. He works with AWS enterprise customers to develop systems to extract value from large volumes of data using AWS data, analytics, and machine learning services. When he’s not working, you’ll probably find him in the mountains—rock climbing, skiing, mountain biking, or out on whatever other adventure he can dream up.

Use Apache Iceberg in a data lake to support incremental data processing

Post Syndicated from Flora Wu original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/

Apache Iceberg is an open table format for very large analytic datasets, which captures metadata information on the state of datasets as they evolve and change over time. It adds tables to compute engines including Spark, Trino, PrestoDB, Flink, and Hive using a high-performance table format that works just like a SQL table. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback.

Apache Iceberg integration is supported by AWS analytics services including Amazon EMR, Amazon Athena, and AWS Glue. Amazon EMR can provision clusters with Spark, Hive, Trino, and Flink that can run Iceberg. Starting with Amazon EMR version 6.5.0, you can use Iceberg with your EMR cluster without requiring a bootstrap action. In early 2022, AWS announced general availability of Athena ACID transactions, powered by Apache Iceberg. The recently released Athena query engine version 3 provides better integration with the Iceberg table format. AWS Glue 3.0 and later supports the Apache Iceberg framework for data lakes.

In this post, we discuss what customers want in modern data lakes and how Apache Iceberg helps address customer needs. Then we walk through a solution to build a high-performance and evolving Iceberg data lake on Amazon Simple Storage Service (Amazon S3) and process incremental data by running insert, update, and delete SQL statements. Finally, we show you how to performance tune the process to improve read and write performance.

How Apache Iceberg addresses what customers want in modern data lakes

More and more customers are building data lakes, with structured and unstructured data, to support many users, applications, and analytics tools. There is an increased need for data lakes to support database like features such as ACID transactions, record-level updates and deletes, time travel, and rollback. Apache Iceberg is designed to support these features on cost-effective petabyte-scale data lakes on Amazon S3.

Apache Iceberg addresses customer needs by capturing rich metadata information about the dataset at the time the individual data files are created. There are three layers in the architecture of an Iceberg table: the Iceberg catalog, the metadata layer, and the data layer, as depicted in the following figure (source).

The Iceberg catalog stores the metadata pointer to the current table metadata file. When a select query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the location of the current metadata file. Whenever there is an update to the Iceberg table, a new snapshot of the table is created, and the metadata pointer points to the current table metadata file.

The following is an example Iceberg catalog with AWS Glue implementation. You can see the database name, the location (S3 path) of the Iceberg table, and the metadata location.

The metadata layer has three types of files: the metadata file, manifest list, and manifest file in a hierarchy. At the top of the hierarchy is the metadata file, which stores information about the table’s schema, partition information, and snapshots. The snapshot points to the manifest list. The manifest list has the information about each manifest file that makes up the snapshot, such as location of the manifest file, the partitions it belongs to, and the lower and upper bounds for partition columns for the data files it tracks. The manifest file tracks data files as well as additional details about each file, such as the file format. All three files work in a hierarchy to track the snapshots, schema, partitioning, properties, and data files in an Iceberg table.

The data layer has the individual data files of the Iceberg table. Iceberg supports a wide range of file formats including Parquet, ORC, and Avro. Because the Iceberg table tracks the individual data files instead of only pointing to the partition location with data files, it isolates the writing operations from reading operations. You can write the data files at any time, but only commit the change explicitly, which creates a new version of the snapshot and metadata files.

Solution overview

In this post, we walk you through a solution to build a high-performing Apache Iceberg data lake on Amazon S3; process incremental data with insert, update, and delete SQL statements; and tune the Iceberg table to improve read and write performance. The following diagram illustrates the solution architecture.

To demonstrate this solution, we use the Amazon Customer Reviews dataset in an S3 bucket (s3://amazon-reviews-pds/parquet/). In real use case, it would be raw data stored in your S3 bucket. We can check the data size with the following code in the AWS Command Line Interface (AWS CLI):

//Run this AWS CLI command to check the data size
aws s3 ls --summarize --human-readable --recursive s3://amazon-reviews-pds/parquet

The total object count is 430, and total size is 47.4 GiB.

To set up and test this solution, we complete the following high-level steps:

  1. Set up an S3 bucket in the curated zone to store converted data in Iceberg table format.
  2. Launch an EMR cluster with appropriate configurations for Apache Iceberg.
  3. Create a notebook in EMR Studio.
  4. Configure the Spark session for Apache Iceberg.
  5. Convert data to Iceberg table format and move data to the curated zone.
  6. Run insert, update, and delete queries in Athena to process incremental data.
  7. Carry out performance tuning.

Prerequisites

To follow along with this walkthrough, you must have an AWS account with an AWS Identity and Access Management (IAM) role that has sufficient access to provision the required resources.

Set up the S3 bucket for Iceberg data in the curated zone in your data lake

Choose the Region in which you want to create the S3 bucket and provide a unique name:

s3://iceberg-curated-blog-data

Launch an EMR cluster to run Iceberg jobs using Spark

You can create an EMR cluster from the AWS Management Console, Amazon EMR CLI, or AWS Cloud Development Kit (AWS CDK). For this post, we walk you through how to create an EMR cluster from the console.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose the latest Amazon EMR release. As of January 2023, the latest release is 6.9.0. Iceberg requires release 6.5.0 and above.
  4. Select JupyterEnterpriseGateway and Spark as the software to install.
  5. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  6. Leave other settings at their default and choose Next.
  7. For Hardware, use the default setting.
  8. Choose Next.
  9. For Cluster name, enter a name. We use iceberg-blog-cluster.
  10. Leave the remaining settings unchanged and choose Next.
  11. Choose Create cluster.

Create a notebook in EMR Studio

We now walk you through how to create a notebook in EMR Studio from the console.

  1. On the IAM console, create an EMR Studio service role.
  2. On the Amazon EMR console, choose EMR Studio.
  3. Choose Get started.

The Get started page appears in a new tab.

  1. Choose Create Studio in the new tab.
  2. Enter a name. We use iceberg-studio.
  3. Choose the same VPC and subnet as those for the EMR cluster, and the default security group.
  4. Choose AWS Identity and Access Management (IAM) for authentication, and choose the EMR Studio service role you just created.
  5. Choose an S3 path for Workspaces backup.
  6. Choose Create Studio.
  7. After the Studio is created, choose the Studio access URL.
  8. On the EMR Studio dashboard, choose Create workspace.
  9. Enter a name for your Workspace. We use iceberg-workspace.
  10. Expand Advanced configuration and choose Attach Workspace to an EMR cluster.
  11. Choose the EMR cluster you created earlier.
  12. Choose Create Workspace.
  13. Choose the Workspace name to open a new tab.

In the navigation pane, there is a notebook that has the same name as the Workspace. In our case, it is iceberg-workspace.

  1. Open the notebook.
  2. When prompted to choose a kernel, choose Spark.

Configure a Spark session for Apache Iceberg

Use the following code, providing your own S3 bucket name:

%%configure -f
{
"conf": {
"spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.demo.warehouse": "s3://iceberg-curated-blog-data",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demo.io-impl":"org.apache.iceberg.aws.s3.S3FileIO"
}
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin.
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information.
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path defined by this property: s3://iceberg-curated-blog-data.
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step).
  • spark.sql.catalog.demo.io-impl – Iceberg allows users to write data to Amazon S3 through S3FileIO. The AWS Glue Data Catalog by default uses this FileIO, and other catalogs can load this FileIO using the io-impl catalog property.

Convert data to Iceberg table format

You can use either Spark on Amazon EMR or Athena to load the Iceberg table. In the EMR Studio Workspace notebook Spark session, run the following commands to load the data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews - this load all the parquet files
val reviews_all_location = "s3://amazon-reviews-pds/parquet/"
val reviews_all = spark.read.parquet(reviews_all_location)

// write reviews data to an Iceberg v2 table
reviews_all.writeTo("demo.reviews.all_reviews").tableProperty("format-version", "2").createOrReplace()

After you run the code, you should find two prefixes created in your data warehouse S3 path (s3://iceberg-curated-blog-data/reviews.db/all_reviews): data and metadata.

Process incremental data using insert, update, and delete SQL statements in Athena

Athena is a serverless query engine that you can use to perform read, write, update, and optimization tasks against Iceberg tables. To demonstrate how the Apache Iceberg data lake format supports incremental data ingestion, we run insert, update, and delete SQL statements on the data lake.

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure the query result location to be the S3 bucket you created earlier. You should be able to see that the table reviews.all_reviews is available for querying. Run the following query to verify that you have loaded the Iceberg table successfully:

select * from reviews.all_reviews limit 5;

Process incremental data by running insert, update, and delete SQL statements:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = 'Watches' and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = 'Watches' and star_rating=1

Performance tuning

In this section, we walk through different ways to improve Apache Iceberg read and write performance.

Configure Apache Iceberg table properties

Apache Iceberg is a table format, and it supports table properties to configure table behavior such as read, write, and catalog. You can improve the read and write performance on Iceberg tables by adjusting the table properties.

For example, if you notice that you write too many small files for an Iceberg table, you can config the write file size to write fewer but bigger size files, to help improve query performance.

Property Default Description
write.target-file-size-bytes 536870912 (512 MB) Controls the size of files generated to target about this many bytes

Use the following code to alter the table format:

//Example code to alter table format in EMR Studio Workspace notebook
spark.sql("ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES ('write_target_data_file_size_bytes'='536870912')")

Partitioning and sorting

To make a query run fast, the less data read the better. Iceberg takes advantage of the rich metadata it captures at write time and facilitates techniques such as scan planning, partitioning, pruning, and column-level stats such as min/max values to skip data files that don’t have match records. We walk you through how query scan planning and partitioning work in Iceberg and how we use them to improve query performance.

Query scan planning

For a given query, the first step in a query engine is scan planning, which is the process to find the files in a table needed for a query. Planning in an Iceberg table is very efficient, because Iceberg’s rich metadata can be used to prune metadata files that aren’t needed, in addition to filtering data files that don’t contain matching data. In our tests, we observed Athena scanned 50% or less data for a given query on an Iceberg table compared to original data before conversion to Iceberg format.

There are two types of filtering:

  • Metadata filtering – Iceberg uses two levels of metadata to track the files in a snapshot: the manifest list and manifest files. It first uses the manifest list, which acts as an index of the manifest files. During planning, Iceberg filters manifests using the partition value range in the manifest list without reading all the manifest files. Then it uses selected manifest files to get data files.
  • Data filtering – After selecting the list of manifest files, Iceberg uses the partition data and column-level stats for each data file stored in manifest files to filter data files. During planning, query predicates are converted to predicates on the partition data and applied first to filter data files. Then, the column stats like column-level value counts, null counts, lower bounds, and upper bounds are used to filter out data files that can’t match the query predicate. By using upper and lower bounds to filter data files at planning time, Iceberg greatly improves query performance.

Partitioning and sorting

Partitioning is a way to group records with the same key column values together in writing. The benefit of partitioning is faster queries that access only part of the data, as explained earlier in query scan planning: data filtering. Iceberg makes partitioning simple by supporting hidden partitioning, in the way that Iceberg produces partition values by taking a column value and optionally transforming it.

In our use case, we first run the following query on the Iceberg table not partitioned. Then we partition the Iceberg table by the category of the reviews, which will be used in the query WHERE condition to filter out records. With partitioning, the query could scan much less data. See the following code:

//Example code in EMR Studio Workspace notebook to create an Iceberg table all_reviews_partitioned partitioned by product_category
reviews_all.writeTo("demo.reviews.all_reviews_partitioned").tableProperty("format-version", "2").partitionedBy($"product_category").createOrReplace()

Run the following select statement on the non-partitioned all_reviews table vs. the partitioned table to see the performance difference:

//Run this query on all_reviews table and the partitioned table for performance testing
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

//Run the same select query on partitioned dataset
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews_partitioned where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table shows the performance improvement of data partitioning, with about 50% performance improvement and 70% less data scanned.

Dataset Name Non-Partitioned Dataset Partitioned Dataset
Runtime (seconds) 8.20 4.25
Data Scanned (MB) 131.55 33.79

Note that the runtime is the average runtime with multiple runs in our test.

We saw good performance improvement after partitioning. However, this can be further improved by using column-level stats from Iceberg manifest files. In order to use the column-level stats effectively, you want to further sort your records based on the query patterns. Sorting the whole dataset using the columns that are often used in queries will reorder the data in such a way that each data file ends up with a unique range of values for the specific columns. If these columns are used in the query condition, it allows query engines to further skip data files, thereby enabling even faster queries.

Copy-on-write vs. read-on-merge

When implementing update and delete on Iceberg tables in the data lake, there are two approaches defined by the Iceberg table properties:

  • Copy-on-write – With this approach, when there are changes to the Iceberg table, either updates or deletes, the data files associated with the impacted records will be duplicated and updated. The records will be either updated or deleted from the duplicated data files. A new snapshot of the Iceberg table will be created and pointing to the newer version of data files. This makes the overall writes slower. There might be situations that concurrent writes are needed with conflicts so retry has to happen, which increases the write time even more. On the other hand, when reading the data, there is no extra process needed. The query will retrieve data from the latest version of data files.
  • Merge-on-read – With this approach, when there are updates or deletes on the Iceberg table, the existing data files will not be rewritten; instead new delete files will be created to track the changes. For deletes, a new delete file will be created with the deleted records. When reading the Iceberg table, the delete file will be applied to the retrieved data to filter out the delete records. For updates, a new delete file will be created to mark the updated records as deleted. Then a new file will be created for those records but with updated values. When reading the Iceberg table, both the delete and new files will be applied to the retrieved data to reflect the latest changes and produce the correct results. So, for any subsequent queries, an extra step to merge the data files with the delete and new files will happen, which will usually increase the query time. On the other hand, the writes might be faster because there is no need to rewrite the existing data files.

To test the impact of the two approaches, you can run the following code to set the Iceberg table properties:

//Run code to alter Iceberg table property to set copy-on-write and merge-on-read in EMR Studio Workspace notebook
spark.sql(“ALTER TABLE demo.reviews.all_reviews 
SET TBLPROPERTIES (‘write.delete.mode’=’copy-on-write’,’write.update.mode’=’copy-on-write’)”)

Run the update, delete, and select SQL statements in Athena to show the runtime difference for copy-on-write vs. merge-on-read:

//Example update statement
update reviews.all_reviews set star_rating=5 where product_category = ‘Watches’ and star_rating=4

//Example delete statement
delete from reviews.all_reviews where product_category = ‘Watches’ and star_rating=1

//Example select statement
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = ‘Watches’ and review_date between date(‘2005-01-01’) and date(‘2005-03-31’)

The following table summarizes the query runtimes.

Query Copy-on-Write Merge-on-Read
UPDATE DELETE SELECT UPDATE DELETE SELECT
Runtime (seconds) 66.251 116.174 97.75 10.788 54.941 113.44
Data scanned (MB) 494.06 3.07 137.16 494.06 3.07 137.16

Note that the runtime is the average runtime with multiple runs in our test.

As our test results show, there are always trade-offs in the two approaches. Which approach to use depends on your use cases. In summary, the considerations come down to latency on the read vs. write. You can reference the following table and make the right choice.

. Copy-on-Write Merge-on-Read
Pros Faster reads Faster writes
Cons Expensive writes Higher latency on reads
When to use Good for frequent reads, infrequent updates and deletes or large batch updates Good for tables with frequent updates and deletes

Data compaction

If your data file size is small, you might end up with thousands or millions of files in an Iceberg table. This dramatically increases the I/O operation and slows down the queries. Furthermore, Iceberg tracks each data file in a dataset. More data files lead to more metadata. This in turn increases the overhead and I/O operation on reading metadata files. In order to improve the query performance, it’s recommended to compact small data files to larger data files.

When updating and deleting records in Iceberg table, if the read-on-merge approach is used, you might end up with many small deletes or new data files. Running compaction will combine all these files and create a newer version of the data file. This eliminates the need to reconcile them during reads. It’s recommended to have regular compaction jobs to impact reads as little as possible while still maintaining faster write speed.

Run the following data compaction command, then run the select query from Athena:

//Data compaction 
optimize reviews.all_reviews REWRITE DATA USING BIN_PACK

//Run this query before and after data compaction
select marketplace,customer_id, review_id,product_id,product_title,star_rating from reviews.all_reviews where product_category = 'Watches' and review_date between date('2005-01-01') and date('2005-03-31')

The following table compares the runtime before vs. after data compaction. You can see about 40% performance improvement.

Query Before Data Compaction After Data Compaction
Runtime (seconds) 97.75 32.676 seconds
Data scanned (MB) 137.16 M 189.19 M

Note that the select queries ran on the all_reviews table after update and delete operations, before and after data compaction. The runtime is the average runtime with multiple runs in our test.

Clean up

After you follow the solution walkthrough to perform the use cases, complete the following steps to clean up your resources and avoid further costs:

  1. Drop the AWS Glue tables and database from Athena or run the following code in your notebook:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.all_reviews") 
spark.sql("DROP TABLE demo.reviews.all_reviews_partitioned") 

// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the EMR Studio console, choose Workspaces in the navigation pane.
  2. Select the Workspace you created and choose Delete.
  3. On the EMR console, navigate to the Studios page.
  4. Select the Studio you created and choose Delete.
  5. On the EMR console, choose Clusters in the navigation pane.
  6. Select the cluster and choose Terminate.
  7. Delete the S3 bucket and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. Then we walked you though a solution to process incremental data in a data lake using Apache Iceberg. Finally, we had a deep dive into performance tuning to improve read and write performance for our use cases.

We hope this post provides some useful information for you to decide whether you want to adopt Apache Iceberg in your data lake solution.


About the Authors

Flora Wu is a Sr. Resident Architect at AWS Data Lab. She helps enterprise customers create data analytics strategies and build solutions to accelerate their businesses outcomes. In her spare time, she enjoys playing tennis, dancing salsa, and traveling.

Daniel Li is a Sr. Solutions Architect at Amazon Web Services. He focuses on helping customers develop, adopt, and implement cloud services and strategy. When not working, he likes spending time outdoors with his family.

Build a real-time GDPR-aligned Apache Iceberg data lake

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-real-time-gdpr-aligned-apache-iceberg-data-lake/

Data lakes are a popular choice for today’s organizations to store their data around their business activities. As a best practice of a data lake design, data should be immutable once stored. But regulations such as the General Data Protection Regulation (GDPR) have created obligations for data operators who must be able to erase or update personal data from their data lake when requested.

A data lake built on AWS uses Amazon Simple Storage Service (Amazon S3) as its primary storage environment. When a customer asks to erase or update private data, the data lake operator needs to find the required objects in Amazon S3 that contain the required data and take steps to erase or update that data. This activity can be a complex process for the following reasons:

  • Data lakes may contain many S3 objects (each may contain multiple rows), and often it’s difficult to find the object containing the exact data that needs to be erased or personally identifiable information (PII) to be updated as per the request
  • By nature, S3 objects are immutable and therefore applying direct row-based transactions like DELETE or UPDATE isn’t possible

To handle these situations, a transactional feature on S3 objects is required, and frameworks such as Apache Hudi or Apache Iceberg provide you the transactional feature for upserts in Amazon S3.

AWS contributed the Apache Iceberg integration with the AWS Glue Data Catalog, which enables you to use open-source data computation engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg, enabling transaction queries on S3 objects.

In this post, we show you how to stream real-time data to an Iceberg table in Amazon S3 using AWS Glue streaming and perform transactions using Amazon Athena for deletes and updates. We use a serverless mechanism for this implementation, which requires minimum operational overhead to manage and fine-tune various configuration parameters, and enables you to extend your use case to ACID operations beyond the GDPR.

Solution overview

We used the Amazon Kinesis Data Generator (KDG) to produce synthetic streaming data in Amazon Kinesis Data Streams and then processed the streaming input data using AWS Glue streaming to store the data in Amazon S3 in Iceberg table format. As part of the customer’s request, we ran delete and update statements using Athena with Iceberg support.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  1. Streaming data is generated in JSON format using the KDG template and inserted into Kinesis Data Streams.
  2. An AWS Glue streaming job is connected to Kinesis Data Streams to process the data using the Iceberg connector.
  3. The streaming job output is stored in Amazon S3 in Iceberg table format.
  4. Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in Iceberg format.
  5. Athena interacts with the Data Catalog tables in Iceberg format for transactional queries required for GDPR.

The codebase required for this post is available in the GitHub repository.

Prerequisites

Before starting the implementation, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy your solution resources:

  1. After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:
  2. For Stack name, enter a name.
  3. For Username, enter the user name for the KDG.
  4. For Password, enter the password for the KDG (this must be at least six alphanumeric characters, and contain at least one number).
  5. For IAMGlueStreamingJobRoleName, enter a name for the IAM role used for the AWS Glue streaming job.
  6. Choose Next and create your stack.

This CloudFormation template configures the following resources in your account:

  • An S3 bucket named streamingicebergdemo-XX (note that the XX part is a random unique number to make the S3 bucket name unique)
  • An IAM policy and role
  • The KDG URL used for creating synthetic data
  1. After you complete the setup, go to the Outputs tab of the CloudFormation stack to get the S3 bucket name, AWS Glue job execution role (as per your input), and KDG URL.
  2. Before proceeding with the demo, create a folder named custdata under the created S3 bucket.

Create a Kinesis data stream

We use Kinesis Data Streams to create a serverless streaming data service that is built to handle millions of events with low latency. The following steps guide you on how to create the data stream in the us-east-1 Region:

  1. Log in to the AWS Management Console.
  2. Navigate to Kinesis console (make sure the Region is us-east-1).
  3. Select Kinesis Data Streams and choose Create data stream.
  4. For Data stream name, enter demo-data-stream.
  5. For this post, we select On-demand as the Kinesis data stream capacity mode.

On-demand mode works to eliminate the need for provisioning and managing the capacity for streaming data. However, you can implement this solution with Kinesis Data Streams in provisioned mode as well.

  1. Choose Create data stream.
  2. Wait for successful creation of demo-data-stream and for it to be in Active status.

Set up the Kinesis Data Generator

To create a sample streaming dataset, we use the KDG URL generated on the CloudFormation stack Outputs tab and log in with the credentials used in the parameters for the CloudFormation template. For this post, we use the following template to generate sample data in the demo-data-stream Kinesis data stream.

  1. Log in to the KDG URL with the user name and password you supplied during stack creation.
  2. Change the Region to us-east-1.
  3. Select the Kinesis data stream demo-data-stream.
  4. For Records per second, choose Constant and enter 100 (it can be another number, depending on the rate of record creation).
  5. On the Template 1 tab, enter the KDG data generation template:
{
"year": "{{random.number({"min":2000,"max":2022})}}",
"month": "{{random.number({"min":1,"max":12})}}",
"day": "{{random.number({"min":1,"max":30})}}",
"hour": "{{random.number({"min":0,"max":24})}}",
"minute": "{{random.number({"min":0,"max":60})}}",
"customerid": {{random.number({"min":5023,"max":59874})}},
"firstname" : "{{name.firstName}}",
"lastname" : "{{name.lastName}}",
"dateofbirth" : "{{date.past(70)}}",
"city" : "{{address.city}}",
"buildingnumber" : {{random.number({"min":63,"max":947})}},
"streetaddress" : "{{address.streetAddress}}",
"state" : "{{address.state}}",
"zipcode" : "{{address.zipCode}}",
"country" : "{{address.country}}",
"countrycode" : "{{address.countryCode}}",
"phonenumber" : "{{phone.phoneNumber}}",
"productname" : "{{commerce.productName}}",
"transactionamount": {{random.number(
{
"min":10,
"max":150
}
)}}
}
  1. Choose Test template to test the sample records.
  2. When the testing is correct, choose Send data.

This will start sending 100 records per second in the Kinesis data stream. (To stop sending data, choose Stop Sending Data to Kinesis.)

Integrate Iceberg with AWS Glue

To add the Apache Iceberg Connector for AWS Glue, complete the following steps. The connector is free to use and supports AWS Glue 1.0, 2.0, and 3.0.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the navigation pane, navigate to AWS Marketplace.
  3. Search for and choose Apache Iceberg Connector for AWS Glue.
  4. Choose Accept Terms and Continue to Subscribe.
  5. Choose Continue to Configuration.
  6. For Fulfillment option, choose your AWS Glue version.
  7. For Software version, choose the latest software version.
  8. Choose Continue to Launch.
  9. Under Usage Instructions, choose the link to activate the connector.
  10. Enter a name for the connection, then choose Create connection and activate the connector.
  11. Verify the new connector on the AWS Glue Studio Connectors.

Create the AWS Glue Data Catalog database

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location and schema of your data. You use the information in the Data Catalog to create and monitor your ETL jobs.

For this post, we create a Data Catalog database named icebergdemodb containing the metadata information of a table named customer, which will be queried through Athena.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. For Database name, enter icebergdemodb.

This creates an AWS Glue database for metadata storage.

Create a Data Catalog table in Iceberg format

In this step, we create a Data Catalog table in Iceberg table format.

  1. On the Athena console, create an Athena workgroup named demoworkgroup for SQL queries.
  2. Choose Athena engine version 3 for Query engine version.

For more information about Athena versions, refer to Changing Athena engine versions.

  1. Enter the S3 bucket location for Query result configuration under Additional configurations.
  2. Open the Athena query editor and choose demoworkgroup.
  3. Choose the database icebergdemodb.
  4. Enter and run the following DDL to create a table pointing to the Data Catalog database icerbergdemodb. Note that the TBLPROPERTIES section mentions ICEBERG as the table type and LOCATION points to the S3 folder (custdata) URI created in earlier steps. This DDL command is available on the GitHub repo.
CREATE TABLE icebergdemodb.customer(
year string,
month string,
day string,
hour string,
minute string,
customerid string,
firstname string,
lastname string,
dateofbirth string,
city string,
buildingnumber string,
streetaddress string,
state string,
zipcode string,
country string,
countrycode string,
phonenumber string,
productname string,
transactionamount int)
LOCATION '<S3 Location URI>'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912',
'optimize_rewrite_delete_file_threshold'='10'
);

After you run the command successfully, you can see the table customer in the Data Catalog.

Create an AWS Glue streaming job

In this section, we create the AWS Glue streaming job, which fetches the record from the Kinesis data stream using the Spark script editor.

  1. On the AWS Glue console, choose Jobs (new) in the navigation pane.
  2. For Create job¸ select Spark script editor.
  3. For Options¸ select Create a new script with boilerplate code.
  4. Choose Create.
  5. Enter the code available in the GitHub repo in the editor.

The sample code keeps appending data in the target location by fetching records from the Kinesis data stream.

  1. Choose the Job details tab in the query editor.
  2. For Name, enter Demo_Job.
  3. For IAM role¸ choose demojobrole.
  4. For Type, choose Spark Streaming.
  5. For Glue Version, choose Glue 3.0.
  6. For Language, choose Python 3.
  7. For Worker type, choose G 0.25X.
  8. Select Automatically scale the number of workers.
  9. For Maximum number of workers, enter 5.
  10. Under Advanced properties, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, choose the connector you created.
  12. For Job parameters, enter the following key pairs (provide your S3 bucket and account ID):
Key Value
--iceberg_job_catalog_warehouse s3://streamingicebergdemo-XX/custdata/
--output_path s3://streamingicebergdemo-XX
--kinesis_arn arn:aws:kinesis:us-east-1:<AWS Account ID>:stream/demo-data-stream
--user-jars-first True

  1. Choose Run to start the AWS Glue streaming job.
  2. To monitor the job, choose Monitoring in the navigation pane.
  3. Select Demo_Job and choose View run details to check the job run details and Amazon CloudWatch logs.

Run GDPR use cases on Athena

In this section, we demonstrate a few use cases that are relevant to GDPR alignment with the user data that’s stored in Iceberg format in the Amazon S3-based data lake as implemented in the previous steps. For this, let’s consider that the following requests are being initiated in the workflow to comply with the regulations:

  • Delete the records for the input customerid (for example, 59289)
  • Update phonenumber for the customerid (for example, 51842)

The IDs used in this example are samples only because they were created through the KDG template used earlier, which creates sample data. You can search for IDs in your implementation by querying through the Athena query editor. The steps remain the same.

Delete data by customer ID

Complete the following steps to fulfill the first use case:

  1. On the Athena console, and make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query using a customer ID and choose Run:
SELECT count(*)
FROM icebergdemodb.customer
WHERE customerid = '59289';

This query gives the count of records for the input customerid before delete.

  1. Enter the following query with the same customer ID and choose Run:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '59289') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN DELETE;

This query deletes the data for the input customerid as per the workflow generated.

  1. Test if there is data with the customer ID using a count query.

The count should be 0.

Update data by customer ID

Complete the following steps to test the second use case:

  1. On the Athena console, make sure icebergdemodb is chosen as the database.
  2. Open the query editor.
  3. Enter the following query with a customer ID and choose Run.
SELECT customerid, phonenumber
FROM icebergdemodb.customer
WHERE customerid = '51936';

This query gives the value for phonenumber before update.

  1. Run the following query to update the required columns:
MERGE INTO icebergdemodb.customer trg
USING (SELECT customerid
FROM icebergdemodb.customer
WHERE customerid = '51936') src
ON (trg.customerid = src.customerid)
WHEN MATCHED
THEN UPDATE SET phonenumber = '000';

This query updates the data to a dummy value.

  1. Run the SELECT query to check the update.

You can see the data is updated correctly.

Vacuum table

A good practice is to run the VACUUM command periodically on the table because operations like INSERT, UPDATE, DELETE, and MERGE will take place on the Iceberg table. See the following code:

VACUUM icebergdemodb.customer;

Considerations

The following are a few considerations to keep in mind for this implementation:

Clean up

Complete the following steps to clean up the resources you created for this post:

    1. Delete the custdata folder in the S3 bucket.
    2. Delete the CloudFormation stack.
    3. Delete the Kinesis data stream.
    4. Delete the S3 bucket storing the data.
    5. Delete the AWS Glue job and Iceberg connector.
    6. Delete the AWS Glue Data Catalog database and table.
    7. Delete the Athena workgroup.
    8. Delete the IAM roles and policies.

Conclusion

This post explained how you can use the Iceberg table format on Athena to implement GDPR use cases like data deletion and data upserts as required, when streaming data is being generated and ingested through AWS Glue streaming jobs in Amazon S3.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the data operations that Iceberg supports. Refer to the Apache Iceberg documentation for details on various operations.


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Rajdip Chaudhuri is Solutions Architect with Amazon Web Services specializing in data and analytics. He enjoys working with AWS customers and partners on data and analytics requirements. In his spare time, he enjoys soccer.

Automate replication of relational sources into a transactional data lake with Apache Iceberg and AWS Glue

Post Syndicated from Luis Gerardo Baeza original https://aws.amazon.com/blogs/big-data/automate-replication-of-relational-sources-into-a-transactional-data-lake-with-apache-iceberg-and-aws-glue/

Organizations have chosen to build data lakes on top of Amazon Simple Storage Service (Amazon S3) for many years. A data lake is the most popular choice for organizations to store all their organizational data generated by different teams, across business domains, from all different formats, and even over history. According to a study, the average company is seeing the volume of their data growing at a rate that exceeds 50% per year, usually managing an average of 33 unique data sources for analysis.

Teams often try to replicate thousands of jobs from relational databases with the same extract, transform, and load (ETL) pattern. There is lot of effort in maintaining the job states and scheduling these individual jobs. This approach helps the teams add tables with few changes and also maintains the job status with minimum effort. This can lead to a huge improvement in the development timeline and tracking the jobs with ease.

In this post, we show you how to easily replicate all your relational data stores into a transactional data lake in an automated fashion with a single ETL job using Apache Iceberg and AWS Glue.

Solution architecture

Data lakes are usually organized using separate S3 buckets for three layers of data: the raw layer containing data in its original form, the stage layer containing intermediate processed data optimized for consumption, and the analytics layer containing aggregated data for specific use cases. In the raw layer, tables usually are organized based on their data sources, whereas tables in the stage layer are organized based on the business domains they belong to.

This post provides an AWS CloudFormation template that deploys an AWS Glue job that reads an Amazon S3 path for one data source of the data lake raw layer, and ingests the data into Apache Iceberg tables on the stage layer using AWS Glue support for data lake frameworks. The job expects tables in the raw layer to be structured in the way AWS Database Migration Service (AWS DMS) ingests them: schema, then table, then data files.

This solution uses AWS Systems Manager Parameter Store for table configuration. You should modify this parameter specifying the tables you want to process and how, including information such as primary key, partitions, and the business domain associated. The job uses this information to automatically create a database (if it doesn’t already exist) for every business domain, create the Iceberg tables, and perform the data loading.

Finally, we can use Amazon Athena to query the data in the Iceberg tables.

The following diagram illustrates this architecture.

Solution architecture

This implementation has the following considerations:

  • All tables from the data source must have a primary key to be replicated using this solution. The primary key can be a single column or a composite key with more than one column.
  • If the data lake contains tables that don’t need upserts or don’t have a primary key, you can exclude them from the parameter configuration and implement traditional ETL processes to ingest them into the data lake. That’s outside of the scope of this post.
  • If there are additional data sources that need to be ingested, you can deploy multiple CloudFormation stacks, one to handle each data source.
  • The AWS Glue job is designed to process data in two phases: the initial load that runs after AWS DMS finishes the full load task, and the incremental load that runs on a schedule that applies change data capture (CDC) files captured by AWS DMS. Incremental processing is performed using an AWS Glue job bookmark.

There are nine steps to complete this tutorial:

  1. Set up a source endpoint for AWS DMS.
  2. Deploy the solution using AWS CloudFormation.
  3. Review the AWS DMS replication task.
  4. Optionally, add permissions for encryption and decryption or AWS Lake Formation.
  5. Review the table configuration on Parameter Store.
  6. Perform initial data loading.
  7. Perform incremental data loading.
  8. Monitor table ingestion.
  9. Schedule incremental batch data loading.

Prerequisites

Before starting this tutorial, you should already be familiar with Iceberg. If you’re not, you can get started by replicating a single table following the instructions in Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue. Additionally, set up the following:

Set up a source endpoint for AWS DMS

Before we create our AWS DMS task, we need to set up a source endpoint to connect to the source database:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. If your database is running on Amazon RDS, choose Select RDS DB instance, then choose the instance from the list. Otherwise, choose the source engine and provide the connection information either through AWS Secrets Manager or manually.
  4. For Endpoint identifier, enter a name for the endpoint; for example, source-postgresql.
  5. Choose Create endpoint.

Deploy the solution using AWS CloudFormation

Create a CloudFormation stack using the provided template. Complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Provide a stack name, such as transactionaldl-postgresql.
  4. Enter the required parameters:
    1. DMSS3EndpointIAMRoleARN – The IAM role ARN for AWS DMS to write data into Amazon S3.
    2. ReplicationInstanceArn – The AWS DMS replication instance ARN.
    3. S3BucketStage – The name of the existing bucket used for the stage layer of the data lake.
    4. S3BucketGlue – The name of the existing S3 bucket for storing AWS Glue scripts.
    5. S3BucketRaw – The name of the existing bucket used for the raw layer of the data lake.
    6. SourceEndpointArn – The AWS DMS endpoint ARN that you created earlier.
    7. SourceName – The arbitrary identifier of the data source to replicate (for example, postgres). This is used to define the S3 path of the data lake (raw layer) where data will be stored.
  5. Do not modify the following parameters:
    1. SourceS3BucketBlog – The bucket name where the provided AWS Glue script is stored.
    2. SourceS3BucketPrefix – The bucket prefix name where the provided AWS Glue script is stored.
  6. Choose Next twice.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

After approximately 5 minutes, the CloudFormation stack is deployed.

Review the AWS DMS replication task

The AWS CloudFormation deployment created an AWS DMS target endpoint for you. Because of two specific endpoint settings, the data will be ingested as we need it on Amazon S3.

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Search for and choose the endpoint that begins with dmsIcebergs3endpoint.
  3. Review the endpoint settings:
    1. DataFormat is specified as parquet.
    2. TimestampColumnName will add the column last_update_time with the date of creation of the records on Amazon S3.

AWS DMS endpoint settings

The deployment also creates an AWS DMS replication task that begins with dmsicebergtask.

  1. Choose Replication tasks in the navigation pane and search for the task.

You will see that the Task Type is marked as Full load, ongoing replication. AWS DMS will perform an initial full load of existing data, and then create incremental files with changes performed to the source database.

On the Mapping Rules tab, there are two types of rules:

  • A selection rule with the name of the source schema and tables that will be ingested from the source database. By default, it uses the sample database provided in the prerequisites, dms_sample, and all tables with the keyword %.
  • Two transformation rules that include in the target files on Amazon S3 the schema name and table name as columns. This is used by our AWS Glue job to know to which tables the files in the data lake correspond.

To learn more about how to customize this for your own data sources, refer to Selection rules and actions.

AWS mapping rules

Let’s change some configurations to finish our task preparation.

  1. On the Actions menu, choose Modify.
  2. In the Task Settings section, under Stop task after full load completes, choose Stop after applying cached changes.

This way, we can control the initial load and incremental file generation as two different steps. We use this two-step approach to run the AWS Glue job once per each step.

  1. Under Task logs, choose Turn on CloudWatch logs.
  2. Choose Save.
  3. Wait about 1 minute for the database migration task status to show as Ready.

Add permissions for encryption and decryption or Lake Formation

Optionally, you can add permissions for encryption and decryption or Lake Formation.

Add encryption and decryption permissions

If your S3 buckets used for the raw and stage layers are encrypted using AWS Key Management Service (AWS KMS) customer managed keys, you need to add permissions to allow the AWS Glue job to access the data:

Add Lake Formation permissions

If you’re managing permissions using Lake Formation, you need to allow your AWS Glue job to create your domain’s databases and tables through the IAM role GlueJobRole.

  1. Grant permissions to create databases (for instructions, refer to Creating a Database).
  2. Grant SUPER permissions to the default database.
  3. Grant data location permissions.
  4. If you create databases manually, grant permissions on all databases to create tables. Refer to Granting table permissions using the Lake Formation console and the named resource method or Granting Data Catalog permissions using the LF-TBAC method according to your use case.

After you complete the later step of performing the initial data load, make sure to also add permissions for consumers to query the tables. The job role will become the owner of all the tables created, and the data lake admin can then perform grants to additional users.

Review table configuration in Parameter Store

The AWS Glue job that performs the data ingestion into Iceberg tables uses the table specification provided in Parameter Store. Complete the following steps to review the parameter store that was configured automatically for you. If needed, modify according to your own needs.

  1. On the Parameter Store console, choose My parameters in the navigation pane.

The CloudFormation stack created two parameters:

  • iceberg-config for job configurations
  • iceberg-tables for table configuration
  1. Choose the parameter iceberg-tables.

The JSON structure contains information that AWS Glue uses to read data and write the Iceberg tables on the target domain:

  • One object per table – The name of the object is created using the schema name, a period, and the table name; for example, schema.table.
  • primaryKey – This should be specified for every source table. You can provide a single column or a comma-separated list of columns (without spaces).
  • partitionCols – This optionally partitions columns for target tables. If you don’t want to create partitioned tables, provide an empty string. Otherwise, provide a single column or a comma-separated list of columns to be used (without spaces).
  1. If you want to use your own data source, use the following JSON code and replace the text in CAPS from the template provided. If you’re using the sample data source provided, keep the default settings:
{
    "SCHEMA_NAME.TABLE_NAME_1": {
        "primaryKey": "ONLY_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": ""
    },
    "SCHEMA_NAME.TABLE_NAME_2": {
        "primaryKey": "FIRST_PRIMARY_KEY,SECOND_PRIMARY_KEY",
        "domain": "TARGET_DOMAIN",
        "partitionCols": "PARTITION_COLUMN_ONE,PARTITION_COLUMN_TWO"
    }
}
  1. Choose Save changes.

Perform initial data loading

Now that the required configuration is finished, we ingest the initial data. This step includes three parts: ingesting the data from the source relational database into the raw layer of the data lake, creating the Iceberg tables on the stage layer of the data lake, and verifying results using Athena.

Ingest data into the raw layer of the data lake

To ingest data from the relational data source (PostgreSQL if you are using the sample provided) to our transactional data lake using Iceberg, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the replication task you created and on the Actions menu, choose Restart/Resume.
  3. Wait about 5 minutes for the replication task to complete. You can monitor the tables ingested on the Statistics tab of the replication task.

AWS DMS full load statistics

After some minutes, the task finishes with the message Full load complete.

  1. On the Amazon S3 console, choose the bucket you defined as the raw layer.

Under the S3 prefix defined on AWS DMS (for example, postgres), you should see a hierarchy of folders with the following structure:

  • Schema
    • Table name
      • LOAD00000001.parquet
      • LOAD0000000N.parquet

AWS DMS full load objects created on S3

If your S3 bucket is empty, review Troubleshooting migration tasks in AWS Database Migration Service before running the AWS Glue job.

Create and ingest data into Iceberg tables

Before running the job, let’s navigate the script of the AWS Glue job provided as part of the CloudFormation stack to understand its behavior.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Search for the job that starts with IcebergJob- and a suffix of your CloudFormation stack name (for example, IcebergJob-transactionaldl-postgresql).
  3. Choose the job.

AWS Glue ETL job review

The job script gets the configuration it needs from Parameter Store. The function getConfigFromSSM() returns job-related configurations such as source and target buckets from where the data needs to be read and written. The variable ssmparam_table_values contain table-related information like the data domain, table name, partition columns, and primary key of the tables that needs to be ingested. See the following Python code:

# Main application
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stackName'])
SSM_PARAMETER_NAME = f"{args['stackName']}-iceberg-config"
SSM_TABLE_PARAMETER_NAME = f"{args['stackName']}-iceberg-tables"

# Parameters for job
rawS3BucketName, rawBucketPrefix, stageS3BucketName, warehouse_path = getConfigFromSSM(SSM_PARAMETER_NAME)
ssm_param_table_values = json.loads(ssmClient.get_parameter(Name = SSM_TABLE_PARAMETER_NAME)['Parameter']['Value'])
dropColumnList = ['db','table_name', 'schema_name','Op', 'last_update_time', 'max_op_date']

The script uses an arbitrary catalog name for Iceberg that is defined as my_catalog. This is implemented on the AWS Glue Data Catalog using Spark configurations, so a SQL operation pointing to my_catalog will be applied on the Data Catalog. See the following code:

catalog_name = 'my_catalog'
errored_table_list = []

# Iceberg configuration
spark = SparkSession.builder \
    .config('spark.sql.warehouse.dir', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.warehouse', warehouse_path) \
    .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

The script iterates over the tables defined in Parameter Store and performs the logic for detecting if the table exists and if the incoming data is an initial load or an upsert:

# Iteration over tables stored on Parameter Store
for key in ssm_param_table_values:
    # Get table data
    isTableExists = False
    schemaName, tableName = key.split('.')
    logger.info(f'Processing table : {tableName}')

The initialLoadRecordsSparkSQL() function loads initial data when no operation column is present in the S3 files. AWS DMS adds this column only to Parquet data files produced by the continuous replication (CDC). The data loading is performed using the INSERT INTO command with SparkSQL. See the following code:

sqltemp = Template("""
    INSERT INTO $catalog_name.$dbName.$tableName  ($insertTableColumnList)
    SELECT $insertTableColumnList FROM insertTable $partitionStrSQL
""")
SQLQUERY = sqltemp.substitute(
    catalog_name = catalog_name, 
    dbName = dbName, 
    tableName = tableName,
    insertTableColumnList = insertTableColumnList[ : -1],
    partitionStrSQL = partitionStrSQL)

logger.info(f'****SQL QUERY IS : {SQLQUERY}')
spark.sql(SQLQUERY)

Now we run the AWS Glue job to ingest the initial data into the Iceberg tables. The CloudFormation stack adds the --datalake-formats parameter, adding the required Iceberg libraries to the job.

  1. Choose Run job.
  2. Choose Job Runs to monitor the status. Wait until the status is Run Succeeded.

Verify the data loaded

To confirm that the job processed the data as expected, complete the following steps:

  1. On the Athena console, choose Query Editor in the navigation pane.
  2. Verify AwsDataCatalog is selected as the data source.
  3. Under Database, choose the data domain that you want to explore, based on the configuration you defined in the parameter store. If using the sample database provided, use sports.

Under Tables and views, we can see the list of tables that were created by the AWS Glue job.

  1. Choose the options menu (three dots) next to the first table name, then choose Preview Data.

You can see the data loaded into Iceberg tables. Amazon Athena review initial data loaded

Perform incremental data loading

Now we start capturing changes from our relational database and applying them to the transactional data lake. This step is also divided in three parts: capturing the changes, applying them to the Iceberg tables, and verifying the results.

Capture changes from the relational database

Due to the configuration we specified, the replication task stopped after running the full load phase. Now we restart the task to add incremental files with changes into the raw layer of the data lake.

  1. On the AWS DMS console, select the task we created and ran before.
  2. On the Actions menu, choose Resume.
  3. Choose Start task to start capturing changes.
  4. To trigger new file creation on the data lake, perform inserts, updates, or deletes on the tables of your source database using your preferred database administration tool. If using the sample database provided, you could run the following SQL commands:
UPDATE dms_sample.nfl_stadium_data_upd
SET seatin_capacity=93703
WHERE team = 'Los Angeles Rams' and sport_location_id = '31';

update  dms_sample.mlb_data 
set bats = 'R'
where mlb_id=506560 and bats='L';

update dms_sample.sporting_event 
set start_date  = current_date 
where id=11 and sold_out=0;
  1. On the AWS DMS task details page, choose the Table statistics tab to see the changes captured.
    AWS DMS CDC statistics
  2. Open the raw layer of the data lake to find a new file holding the incremental changes inside every table’s prefix, for example under the sporting_event prefix.

The record with changes for the sporting_event table looks like the following screenshot.

AWS DMS objects migrated into S3 with CDC

Notice the Op column in the beginning identified with an update (U). Also, the second date/time value is the control column added by AWS DMS with the time the change was captured.

CDC file schema on Amazon S3

Apply changes on the Iceberg tables using AWS Glue

Now we run the AWS Glue job again, and it will automatically process only the new incremental files since the job bookmark is enabled. Let’s review how it works.

The dedupCDCRecords() function performs deduplication of data because multiple changes to a single record ID could be captured within the same data file on Amazon S3. Deduplication is performed based on the last_update_time column added by AWS DMS that indicates the timestamp of when the change was captured. See the following Python code:

def dedupCDCRecords(inputDf, keylist):
    IDWindowDF = Window.partitionBy(*keylist).orderBy(inputDf.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
    inputDFWithTS = inputDf.withColumn('max_op_date', max(inputDf.last_update_time).over(IDWindowDF))
    
    NewInsertsDF = inputDFWithTS.filter('last_update_time=max_op_date').filter("op='I'")
    UpdateDeleteDf = inputDFWithTS.filter('last_update_time=max_op_date').filter("op IN ('U','D')")
    finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

    return finalInputDF

On line 99, the upsertRecordsSparkSQL() function performs the upsert in a similar fashion to the initial load, but this time with a SQL MERGE command.

Review the applied changes

Open the Athena console and run a query that selects the changed records on the source database. If using the provided sample database, use one the following SQL queries:

SELECT * FROM "sports"."nfl_stadiu_data_upd"
WHERE team = 'Los Angeles Rams' and sport_location_id = 31
LIMIT 1;

Amazon Athena review cdc data loaded

Monitor table ingestion

The AWS Glue job script is coded with simple Python exception handling to catch errors during processing a specific table. The job bookmark is saved after each table finishes processing successfully, to avoid reprocessing tables if the job run is retried for the tables with errors.

The AWS Command Line Interface (AWS CLI) provides a get-job-bookmark command for AWS Glue that provides insight into the status of the bookmark for each table processed.

  1. On the AWS Glue Studio console, choose the ETL job.
  2. Choose the Job Runs tab and copy the job run ID.
  3. Run the following command on a terminal authenticated for the AWS CLI, replacing <GLUE_JOB_RUN_ID> on line 1 with the value you copied. If your CloudFormation stack is not named transactionaldl-postgresql, provide the name of your job on line 2 of the script:
jobrun=<GLUE_JOB_RUN_ID>
jobname=IcebergJob-transactionaldl-postgresql
aws glue get-job-bookmark --job-name jobname --run-id $jobrun

In this solution, when a table processing causes an exception, the AWS Glue job will not fail according to this logic. Instead, the table will be added into an array that is printed after the job is complete. In such scenario, the job will be marked as failed after it tries to process the rest of the tables detected on the raw data source. This way, tables without errors don’t have to wait until the user identifies and solves the problem on the conflicting tables. The user can quickly detect job runs that had issues using the AWS Glue job run status, and identify which specific tables are causing the problem using the CloudWatch logs for the job run.

  1. The job script implements this feature with the following Python code:
# Performed for every table
        try:
            # Table processing logic
        except Exception as e:
            logger.info(f'There is an issue with table: {tableName}')
            logger.info(f'The exception is : {e}')
            errored_table_list.append(tableName)
            continue
        job.commit()
if (len(errored_table_list)):
    logger.info('Total number of errored tables are ',len(errored_table_list))
    logger.info('Tables that failed during processing are ', *errored_table_list, sep=', ')
    raise Exception(f'***** Some tables failed to process.')

The following screenshot shows how the CloudWatch logs look for tables that cause errors on processing.

AWS Glue job monitoring with logs

Aligned with the AWS Well-Architected Framework Data Analytics Lens practices, you can adapt more sophisticated control mechanisms to identify and notify stakeholders when errors appear on the data pipelines. For example, you can use an Amazon DynamoDB control table to store all tables and job runs with errors, or using Amazon Simple Notification Service (Amazon SNS) to send alerts to operators when certain criteria is met.

Schedule incremental batch data loading

The CloudFormation stack deploys an Amazon EventBridge rule (disabled by default) that can trigger the AWS Glue job to run on a schedule. To provide your own schedule and enable the rule, complete the following steps:

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Search for the rule prefixed with the name of your CloudFormation stack followed by JobTrigger (for example, transactionaldl-postgresql-JobTrigger-randomvalue).
  3. Choose the rule.
  4. Under Event Schedule, choose Edit.

The default schedule is configured to trigger every hour.

  1. Provide the schedule you want to run the job.
  2. Additionally, you can use an EventBridge cron expression by selecting A fine-grained schedule.
    Amazon EventBridge schedule ETL job
  3. When you finish setting up the cron expression, choose Next three times, and finally choose Update Rule to save changes.

The rule is created disabled by default to allow you to run the initial data load first.

  1. Activate the rule by choosing Enable.

You can use the Monitoring tab to view rule invocations, or directly on the AWS Glue Job Run details.

Conclusion

After deploying this solution, you have automated the ingestion of your tables on a single relational data source. Organizations using a data lake as their central data platform usually need to handle multiple, sometimes even tens of data sources. Also, more and more use cases require organizations to implement transactional capabilities to the data lake. You can use this solution to accelerate the adoption of such capabilities across all your relational data sources to enable new business use cases, automating the implementation process to derive more value from your data.


About the Authors

Luis Gerardo BaezaLuis Gerardo Baeza is a Big Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience helping organizations in the healthcare, financial and education sectors to adopt enterprise architecture programs, cloud computing, and data analytics capabilities. Luis currently helps organizations across Latin America to accelerate strategic data initiatives.

SaiKiran Reddy AenuguSaiKiran Reddy Aenugu is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 10 years of experience implementing data loading, transformation, and visualization processes. SaiKiran currently helps organizations in North America to adopt modern data architectures such as data lakes and data mesh. He has experience in the retail, airline, and finance sectors.

Narendra MerlaNarendra Merla is a Data Architect in the Amazon Web Services (AWS) Data Lab. He has 12 years of experience in designing and productionalizing both real-time and batch-oriented data pipelines and building data lakes on both cloud and on-premises environments. Narendra currently helps organizations in North America to build and design robust data architectures, and has experience in the telecom and finance sectors.

How to visualize IAM Access Analyzer policy validation findings with QuickSight

Post Syndicated from Mostefa Brougui original https://aws.amazon.com/blogs/security/how-to-visualize-iam-access-analyzer-policy-validation-findings-with-quicksight/

In this blog post, we show you how to create an Amazon QuickSight dashboard to visualize the policy validation findings from AWS Identity and Access Management (IAM) Access Analyzer. You can use this dashboard to better understand your policies and how to achieve least privilege by periodically validating your IAM roles against IAM best practices. This blog post walks you through the deployment for a multi-account environment using AWS Organizations.

Achieving least privilege is a continuous cycle to grant only the permissions that your users and systems require. To achieve least privilege, you start by setting fine-grained permissions. Then, you verify that the existing access meets your intent. Finally, you refine permissions by removing unused access. To learn more, see IAM Access Analyzer makes it easier to implement least privilege permissions by generating IAM policies based on access activity.

Policy validation is a feature of IAM Access Analyzer that guides you to author and validate secure and functional policies with more than 100 policy checks. You can use these checks when creating new policies or to validate existing policies. To learn how to use IAM Access Analyzer policy validation APIs when creating new policies, see Validate IAM policies in CloudFormation templates using IAM Access Analyzer. In this post, we focus on how to validate existing IAM policies.

Approach to visualize IAM Access Analyzer findings

As shown in Figure 1, there are four high-level steps to build the visualization.

Figure 1: Steps to visualize IAM Access Analyzer findings

Figure 1: Steps to visualize IAM Access Analyzer findings

  1. Collect IAM policies

    To validate your IAM policies with IAM Access Analyzer in your organization, start by periodically sending the content of your IAM policies (inline and customer-managed) to a central account, such as your Security Tooling account.

  2. Validate IAM policies

    After you collect the IAM policies in a central account, run an IAM Access Analyzer ValidatePolicy API call on each policy. The API calls return a list of findings. The findings can help you identify issues, provide actionable recommendations to resolve the issues, and enable you to author functional policies that can meet security best practices. The findings are stored in an Amazon Simple Storage Service (Amazon S3) bucket. To learn about different findings, see Access Analyzer policy check reference.

  3. Visualize findings

    IAM Access Analyzer policy validation findings are stored centrally in an S3 bucket. The S3 bucket is owned by the central (hub) account of your choosing. You can use Amazon Athena to query the findings from the S3 bucket, and then create a QuickSight analysis to visualize the findings.

  4. Publish dashboards

    Finally, you can publish a shareable QuickSight dashboard. Figure 2 shows an example of the dashboard.

    Figure 2: Dashboard overview

    Figure 2: Dashboard overview

Design overview

This implementation is a serverless job initiated by Amazon EventBridge rules. It collects IAM policies into a hub account (such as your Security Tooling account), validates the policies, stores the validation results in an S3 bucket, and uses Athena to query the findings and QuickSight to visualize them. Figure 3 gives a design overview of our implementation.

Figure 3: Design overview of the implementation

Figure 3: Design overview of the implementation

As shown in Figure 3, the implementation includes the following steps:

  1. A time-based rule is set to run daily. The rule triggers an AWS Lambda function that lists the IAM policies of the AWS account it is running in.
  2. For each IAM policy, the function sends a message to an Amazon Simple Queue Service (Amazon SQS) queue. The message contains the IAM policy Amazon Resource Name (ARN), and the policy document.
  3. When new messages are received, the Amazon SQS queue initiates the second Lambda function. For each message, the Lambda function extracts the policy document and validates it by using the IAM Access Analyzer ValidatePolicy API call.
  4. The Lambda function stores validation results in an S3 bucket.
  5. An AWS Glue table contains the schema for the IAM Access Analyzer findings. Athena natively uses the AWS Glue Data Catalog.
  6. Athena queries the findings stored in the S3 bucket.
  7. QuickSight uses Athena as a data source to visualize IAM Access Analyzer findings.

Benefits of the implementation

By implementing this solution, you can achieve the following benefits:

  • Store your IAM Access Analyzer policy validation results in a scalable and cost-effective manner with Amazon S3.
  • Add scalability and fault tolerance to your validation workflow with Amazon SQS.
  • Partition your evaluation results in Athena and restrict the amount of data scanned by each query, helping to improve performance and reduce cost.
  • Gain insights from IAM Access Analyzer policy validation findings with QuickSight dashboards. You can use the dashboard to identify IAM policies that don’t comply with AWS best practices and then take action to correct them.

Prerequisites

Before you implement the solution, make sure you’ve completed the following steps:

  1. Install a Git client, such as GitHub Desktop.
  2. Install the AWS Command Line Interface (AWS CLI). For instructions, see Installing or updating the latest version of the AWS CLI.
  3. If you plan to deploy the implementation in a multi-account environment using Organizations, enable all features and enable trusted access with Organizations to operate a service-managed stack set.
  4. Get a QuickSight subscription to the Enterprise edition. When you first subscribe to the Enterprise edition, you get a free trial for four users for 30 days. Trial authors are automatically converted to month-to-month subscription upon trial expiry. For more details, see Signing up for an Amazon QuickSight subscription, Amazon QuickSight Enterprise edition and the Amazon QuickSight Pricing Calculator.

Note: This implementation works in accounts that don’t have AWS Lake Formation enabled. If Lake Formation is enabled in your account, you might need to grant Lake Formation permissions in addition to the implementation IAM permissions. For details, see Lake Formation access control overview.

Walkthrough

In this section, we will show you how to deploy an AWS CloudFormation template to your central account (such as your Security Tooling account), which is the hub for IAM Access Analyzer findings. The central account collects, validates, and visualizes your findings.

To deploy the implementation to your multi-account environment

  1. Deploy the CloudFormation stack to your central account.

    Important: Do not deploy the template to the organization’s management account; see design principles for organizing your AWS accounts. You can choose the Security Tooling account as a hub account.

    In your central account, run the following commands in a terminal. These commands clone the GitHub repository and deploy the CloudFormation stack to your central account.

    # A) Clone the repository
    git clone https://github.com/aws-samples/visualize-iam-access-analyzer-policy-validation-findings.git
      # B) Switch to the repository's directory cd visualize-iam-access-analyzer-policy-validation-findings
      # C) Deploy the CloudFormation stack to your central security account (hub). For <AWSRegion> enter your AWS Region without quotes. make deploy-hub aws-region=<AWSRegion>

    If you want to send IAM policies from other member accounts to your central account, you will need to make note of the CloudFormation stack outputs for SQSQueueUrl and KMSKeyArn when the deployment is complete.

    make describe-hub-outputs aws-region=<AWSRegion>

  2. Switch to your organization’s management account and deploy the stack sets to the member accounts. For <SQSQueueUrl> and <KMSKeyArn>, use the values from the previous step.
    # Create a CloudFormation stack set to deploy the resources to the member accounts.
      make deploy-members SQSQueueUrl=<SQSQueueUrl> KMSKeyArn=<KMSKeyArn< aws-region=<AWSRegion>

To deploy the QuickSight dashboard to your central account

  1. Make sure that QuickSight is using the IAM role aws-quicksight-service-role.
    1. In QuickSight, in the navigation bar at the top right, choose your account (indicated by a person icon) and then choose Manage QuickSight.
    2. On the Manage QuickSight page, in the menu at the left, choose Security & Permissions.
    3. On the Security & Permissions page, under QuickSight access to AWS services, choose Manage.
    4. For IAM role, choose Use an existing role, and then do one of the following:
      • If you see a list of existing IAM roles, choose the role

        arn:aws:iam::<account-id>:role/service-role/aws-quicksight-service-role.

      • If you don’t see a list of existing IAM roles, enter the IAM ARN for the role in the following format:

        arn:aws:iam::<account-id>:role/service-role/aws-quicksight-service-role.

    5. Choose Save.
  2. Retrieve the QuickSight users.
    # <aws-region> your Quicksight main Region, for example eu-west-1
    # <account-id> The ID of your account, for example 123456789012
    # <namespace-name> Quicksight namespace, for example default.
    # You can list the namespaces by using aws quicksight list-namespaces --aws-account-id <account-id>
      aws quicksight list-users --region <aws-region> --aws-account-id <account-id> --namespace <namespace-name>

  3. Make a note of the user’s ARN that you want to grant permissions to list, describe, or update the QuickSight dashboard. This information is found in the arn element. For example, arn:aws:quicksight:us-east-1:111122223333:user/default/User1
  4. To launch the deployment stack for the QuickSight dashboard, run the following command. Replace <quicksight-user-arn> with the user’s ARN from the previous step.
    make deploy-dashboard-hub aws-region=<AWSRegion> quicksight-user-arn=<quicksight-user-arn>

Publish and share the QuickSight dashboard with the policy validation findings

You can publish your QuickSight dashboard and then share it with other QuickSight users for reporting purposes. The dashboard preserves the configuration of the analysis at the time that it’s published and reflects the current data in the datasets used by the analysis.

To publish the QuickSight dashboard

  1. In the QuickSight console, choose Analyses and then choose access-analyzer-validation-findings.
  2. (Optional) Modify the visuals of the analysis. For more information, see Tutorial: Modify Amazon QuickSight visuals.
  3. Share the QuickSight dashboard.
    1. In your analysis, in the application bar at the upper right, choose Share, and then choose Publish dashboard.
    2. On the Publish dashboard page, choose Publish new dashboard as and enter IAM Access Analyzer Policy Validation.
    3. Choose Publish dashboard. The dashboard is now published.
  4. On the QuickSight start page, choose Dashboards.
  5. Select the IAM Access Analyzer Policy Validation dashboard. IAM Access Analyzer policy validation findings will appear within the next 24 hours.

    Note: If you don’t want to wait until the Lambda function is initiated automatically, you can invoke the function that lists customer-managed policies and inline policies by using the aws lambda invoke AWS CLI command on the hub account and wait one to two minutes to see the policy validation findings:

    aws lambda invoke –function-name access-analyzer-list-iam-policy –invocation-type Event –cli-binary-format raw-in-base64-out –payload {} response.json

  6. (Optional) To export your dashboard as a PDF, see Exporting Amazon QuickSight analyses or dashboards as PDFs.

To share the QuickSight dashboard

  1. In the QuickSight console, choose Dashboards and then choose IAM Access Analyzer Policy Validation.
  2. In your dashboard, in the application bar at the upper right, choose Share, and then choose Share dashboard.
  3. On the Share dashboard page that opens, do the following:
    1. For Invite users and groups to dashboard on the left pane, enter a user email or group name in the search box. Users or groups that match your query appear in a list below the search box. Only active users and groups appear in the list.
    2. For the user or group that you want to grant access to the dashboard, choose Add. Then choose the level of permissions that you want them to have.
  4. After you grant users access to a dashboard, you can copy a link to it and send it to them.

For more details, see Sharing dashboards or Sharing your view of a dashboard.

Your teams can use this dashboard to better understand their IAM policies and how to move toward least-privilege permissions, as outlined in the section Validate your IAM roles of the blog post Top 10 security items to improve in your AWS account.

Clean up

To avoid incurring additional charges in your accounts, remove the resources that you created in this walkthrough.

Before deleting the CloudFormation stacks and stack sets in your accounts, make sure that the S3 buckets that you created are empty. To delete everything (including old versioned objects) in a versioned bucket, we recommend emptying the bucket through the console. Before deleting the CloudFormation stack from the central account, delete the Athena workgroup.

To delete remaining resources from your AWS accounts

  1. Delete the CloudFormation stack from your central account by running the following command. Make sure to replace <AWSRegion> with your own Region.
    make delete-hub aws-region=<AWSRegion>

  2. Delete the CloudFormation stack set instances and stack sets by running the following command using your organization’s management account credentials. Make sure to replace <AWSRegion> with your own Region.
    make delete-stackset-instances aws-region=<AWSRegion>
      # Wait for the operation to finish. You can check its progress on the CloudFormation console.
      make delete-stackset aws-region=<AWSRegion>

  3. Delete the QuickSight dashboard by running the following command using the central account credentials. Make sure to replace <AWSRegion> with your own Region.
    make delete-dashboard aws-region=<AWSRegion>

  4. To cancel your QuickSight subscription and close the account, see Canceling your Amazon QuickSight subscription and closing the account.

Conclusion

In this post, you learned how to validate your existing IAM policies by using the IAM Access Analyzer ValidatePolicy API and visualizing the results with AWS analytics tools. By using the implementation, you can better understand your IAM policies and work to reach least privilege in a scalable, fault-tolerant, and cost-effective way. This will help you identify opportunities to tighten your permissions and to grant the right fine-grained permissions to help enhance your overall security posture.

To learn more about IAM Access Analyzer, see previous blog posts on IAM Access Analyzer.

To download the CloudFormation templates, see the visualize-iam-access-analyzer-policy-validation-findings GitHub repository. For information about pricing, see Amazon SQS pricing, AWS Lambda pricing, Amazon Athena pricing and Amazon QuickSight pricing.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Security, Identity, & Compliance re:Post.

Want more AWS Security news? Follow us on Twitter.

Mostefa Brougui

Mostefa Brougui

Mostefa is a Sr. Security Consultant in Professional Services at Amazon Web Services. He works with AWS enterprise customers to design, build, and optimize their security architecture to drive business outcomes.

Tobias Nickl

Tobias Nickl

Tobias works in Professional Services at Amazon Web Services as a Security Engineer. In addition to building custom AWS solutions, he advises AWS enterprise customers on how to reach their business objectives and accelerate their cloud transformation.

Analyze Amazon S3 storage costs using AWS Cost and Usage Reports, Amazon S3 Inventory, and Amazon Athena

Post Syndicated from Dagar Katyal original https://aws.amazon.com/blogs/big-data/analyze-amazon-s3-storage-costs-using-aws-cost-and-usage-reports-amazon-s3-inventory-and-amazon-athena/

Since its launch in 2006, Amazon Simple Storage Service (Amazon S3) has experienced major growth, supporting multiple use cases such as hosting websites, creating data lakes, serving as object storage for consumer applications, storing logs, and archiving data. As the application portfolio grows, customers tend to store data from multiple application and different business functions in a single S3 bucket, which can grow the storage in S3 buckets to hundreds of TBs. The AWS Billing console provides a way to look at the total storage cost of data stored in Amazon S3, but sometimes IT organizations need to understand the breakdown of costs of a particular S3 bucket by various prefixes or objects corresponding to a particular user or application. There are various reasons to analyze the costs of S3 buckets, such as to identify the spend breakdown, do internal chargebacks, understand the cost breakdown by business unit and application, and many more. As of this writing, there is no easy way to do a cost breakdown of S3 buckets by objects and prefixes.

In this post, we discuss a solution using Amazon Athena to query AWS Cost and Usage Reports and Amazon S3 Inventory reports to analyze the cost by prefixes and objects in an S3 bucket.

Overview of solution

The following figure shows the architecture for this solution. First, we enable the AWS Cost and Usage Reports (AWS CUR) and Amazon S3 Inventory features, which save the output into two separate pre-created S3 buckets. We then use Athena to query these S3 buckets for AWS CUR data and S3 object inventory data to correlate and allocate the cost breakdown at the object or prefix level.

architecture diagram

To implement the solution, we complete the following steps:

  1. Create S3 buckets for AWS CUR, S3 object inventory, and Athena results. Alternatively, you can create these respective buckets when enabling the respective individual features, but for the purpose of this post, we create all of them at the beginning.
  2. Enable the Cost and Usage Reports.
  3. Enable Amazon S3 Inventory configuration.
  4. Create AWS Glue Data Catalog tables for the CUR and S3 object inventory to query using Athena.
  5. Run queries in Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create S3 buckets

Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance. Customers of all sizes and industries can store and protect any amount of data for virtually any use case, such as data lakes, cloud-native applications, and mobile apps. With cost-effective storage classes and easy-to-use management features, you can optimize costs, organize data, and configure fine-tuned access controls to meet specific business, organizational, and compliance requirements.

For this post, we use the S3 bucket s3-object-cost-allocation as the primary bucket for cost allocation. This S3 bucket is conveniently modeled to contain several prefixes and objects of different sizes for which cost allocation needs to be done based on the overall cost of the bucket. In a real-world scenario, you should use a bucket that has data for multiple teams and for which you need to allocate costs by prefix or object. Going forward, we refer to this bucket as the primary object bucket.

The following screenshot shows our S3 bucket and folders.

example Folders created

Now let’s create the three additional operational S3 buckets to store the datasets generated to calculate costs for the objects. You can create the following buckets or any existing buckets as needed:

  • cur-cost-usage-reports-<account_number> – This bucket is used to save the Cost and Usage Reports for the account.
  • S3-inventory-configurations-<account_number> – This bucket is used to save the inventory configurations of our primary object bucket.
  • athena-query-bucket-<account_number> – This bucket is used to save the query results from Athena.

Complete the following steps to create your S3 buckets:

  • On the Amazon S3 console, choose Buckets in the navigation pane.
  • Choose Create bucket.
  • For Bucket name, enter the name of your bucket (cur-cost-usage-reports-<account_number>).
  • For AWS Region, choose your preferred Region.
  • Leave all other settings at default (or according to your organization’s standards).
  • Choose Create bucket.
    create s3 bucket
  • Repeat these steps to create s3-inventory-configurations-<account_number> and athena-query-bucket-<account_number>.

Enable the Cost and Usage Reports

The AWS Cost and Usage Reports (AWS CUR) contains the most comprehensive set of cost and usage data available. You can use Cost and Usage Reports to publish your AWS billing reports to an S3 bucket that you own. You can receive reports that break down your costs by the hour, day, or month; by product or product resource; or by tags that you define yourself.

Complete the following steps to enable Cost and Usage Reports for your account:

  • On the AWS Billing console, in the navigation pane, choose Cost & Usage Reports.
  • Choose Create report.
  • For Report name, enter a name for your report, such as account-cur-s3.
  • For Additional report details, select Include resource IDs to include the IDs of each individual resource in the report.Including resource IDs will create individual line items for each of your resources. This can increase the size of your Cost and Usage Reports files significantly, which can affect the S3 storage costs for your CUR, based on your AWS usage. We need this feature enabled for this post.
  • For Data refresh settings, select whether you want the Cost and Usage Reports to refresh if AWS applies refunds, credits, or support fees to your account after finalizing your bill.When a report refreshes, a new report is uploaded to Amazon S3.
  • Choose Next.
  • For S3 bucket, choose Configure.
  • For Configure S3 Bucket, select an existing bucket created in the previous section (cur-cost-usage-reports-<account_number>) and choose Next.
  • Review the bucket policy, select I have confirmed that this policy is correct, and choose Save. This default bucket policy provides Cost and Usage Reports access to write data to Amazon S3.
  • For Report path prefix, enter cur-data/account-cur-daily.
  • For Time granularity, choose Daily.
  • For Report versioning, choose Overwrite existing report.
  • For Enable report data integration for, select Amazon Athena.
  • Choose Next.
  • After you have reviewed the settings for your report, choose Review and Complete.
    create cost and usage report

The Cost and Usage reports will be delivered to the S3 buckets within 24 hours.

The following sample CUR in CSV format shows different columns of the Cost and Usage Report, including bill_invoice_id, bill_invoicing_entity, bill_payer_account_id, and line_item_product_code, to name a few.

sample cost and usage report

Enable Amazon S3 Inventory configuration

Amazon S3 Inventory is one of the tools Amazon S3 provides to help manage your storage. You can use it to audit and report on the replication and encryption status of your objects for business, compliance, and regulatory needs. Amazon S3 Inventory provides comma-separated values (CSV), Apache Optimized Row Columnar (ORC), or Apache Parquet output files that list your objects and their corresponding metadata on a daily or weekly basis for an S3 bucket or a shared prefix (objects that have names that begin with a common string).

Complete the following steps to enable Amazon S3 Inventory on the primary object bucket:

  • On the Amazon S3 console, choose Buckets in the navigation pane.
  • Choose the bucket for which you want to configure Amazon S3 Inventory.
    This will be the existing bucket in your account that has data that needs to be analyzed. This could be your data lake or application S3 bucket. We created the bucket s3-object-cost-allocation with some sample data and folder structure.
  • Choose Management.
  • Under Inventory configurations, choose Create inventory configuration.
  • For Inventory configuration name, enter s3-object-cost-allocation.
  • For Inventory scope, leave Prefix blank.
    This is to ensure that all objects are covered for the report.
  • For Object Versions, select Current version only.
  • For Report details, choose This account.
  • For Destination, choose the destination bucket we created (s3-inventory-configurations-<account_number>).
  • For Frequency, choose Daily.
  • For Output format, choose as Apache Parquet.
  • For Status, choose Enable.
  • Keep server-side encryption disabled. To use server-side encryption, choose Enable and specify the encryption key.
  • For Additional fields, select the following to add to the inventory report:
    • Size – The object size in bytes.
    • Last modified date – The object creation date or the last modified date, whichever is the latest.
    • Multipart upload – Specifies that the object was uploaded as a multipart upload. For more information, see Uploading and copying objects using multipart upload.
    • Replication status – The replication status of the object. For more information, see Using the S3 console.
    • Encryption status – The server-side encryption used to encrypt the object. For more information, see Protecting data using server-side encryption.
    • Bucket key status – Indicates whether a bucket-level key generated by AWS KMS applies to the object.
    • Storage class – The storage class used for storing the object.
    • Intelligent-Tiering: Access tier – Indicates the access tier of the object if it was stored in Intelligent-Tie
      create s3 inventory
  • Choose Create.
    s3 inventory configuration

It may take up to 48 hours to deliver the first report.

Create AWS Glue Data Catalog tables for CUR and Amazon S3 Inventory reports

Wait for up to 48 hours for the previous step to generate the reports. In this section, we use Athena to create and define AWS Glue Data Catalog tables for the data that has been created using Cost and Usage Reports and Amazon S3 Inventory reports.

Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data where it lives.

Complete the following steps to create the tables using Athena:

  • Navigate to the Athena console.
  • If you’re using Athena for the first time, you need to set up a query result location in Amazon S3. If you preconfigured this in Athena , you can skip this step.
    • Choose View settings.
      athena setup query bucket
    • Choose Manage.
    • In the section Query result location and encryption, choose Browse S3 and choose the bucket that we created (athena-query-bucket-<account_number>).
    • Choose Save.
      Athena Config
    • Navigate back to the Athena query editor.
  • Run the following query in Athena to create a table for Cost and Usage Reports. Verify and update the section for <<LOCATION>> at the end of the query and point it to the correct S3 bucket and location. Note that the new table name should be account_cur.
    CREATE EXTERNAL TABLE `account_cur`(
    `identity_line_item_id` string,
    `identity_time_interval` string,
    `bill_invoice_id` string,
    `bill_billing_entity` string,
    `bill_bill_type` string,
    `bill_payer_account_id` string,
    `bill_billing_period_start_date` timestamp,
    `bill_billing_period_end_date` timestamp,
    `line_item_usage_account_id` string,
    `line_item_line_item_type` string,
    `line_item_usage_start_date` timestamp,
    `line_item_usage_end_date` timestamp,
    `line_item_product_code` string,
    `line_item_usage_type` string,
    `line_item_operation` string,
    `line_item_availability_zone` string,
    `line_item_resource_id` string,
    `line_item_usage_amount` double,
    `line_item_normalization_factor` double,
    `line_item_normalized_usage_amount` double,
    `line_item_currency_code` string,
    `line_item_unblended_rate` string,
    `line_item_unblended_cost` double,
    `line_item_blended_rate` string,
    `line_item_blended_cost` double,
    `line_item_line_item_description` string,
    `line_item_tax_type` string,
    `line_item_legal_entity` string,
    `product_product_name` string,
    `product_availability` string,
    `product_description` string,
    `product_durability` string,
    `product_event_type` string,
    `product_fee_code` string,
    `product_fee_description` string,
    `product_free_query_types` string,
    `product_from_location` string,
    `product_from_location_type` string,
    `product_from_region_code` string,
    `product_group` string,
    `product_group_description` string,
    `product_location` string,
    `product_location_type` string,
    `product_message_delivery_frequency` string,
    `product_message_delivery_order` string,
    `product_operation` string,
    `product_platopricingtype` string,
    `product_product_family` string,
    `product_queue_type` string,
    `product_region` string,
    `product_region_code` string,
    `product_servicecode` string,
    `product_servicename` string,
    `product_sku` string,
    `product_storage_class` string,
    `product_storage_media` string,
    `product_to_location` string,
    `product_to_location_type` string,
    `product_to_region_code` string,
    `product_transfer_type` string,
    `product_usagetype` string,
    `product_version` string,
    `product_volume_type` string,
    `pricing_rate_code` string,
    `pricing_rate_id` string,
    `pricing_currency` string,
    `pricing_public_on_demand_cost` double,
    `pricing_public_on_demand_rate` string,
    `pricing_term` string,
    `pricing_unit` string,
    `reservation_amortized_upfront_cost_for_usage` double,
    `reservation_amortized_upfront_fee_for_billing_period` double,
    `reservation_effective_cost` double,
    `reservation_end_time` string,
    `reservation_modification_status` string,
    `reservation_normalized_units_per_reservation` string,
    `reservation_number_of_reservations` string,
    `reservation_recurring_fee_for_usage` double,
    `reservation_start_time` string,
    `reservation_subscription_id` string,
    `reservation_total_reserved_normalized_units` string,
    `reservation_total_reserved_units` string,
    `reservation_units_per_reservation` string,
    `reservation_unused_amortized_upfront_fee_for_billing_period` double,
    `reservation_unused_normalized_unit_quantity` double,
    `reservation_unused_quantity` double,
    `reservation_unused_recurring_fee` double,
    `reservation_upfront_value` double,
    `savings_plan_total_commitment_to_date` double,
    `savings_plan_savings_plan_a_r_n` string,
    `savings_plan_savings_plan_rate` double,
    `savings_plan_used_commitment` double,
    `savings_plan_savings_plan_effective_cost` double,
    `savings_plan_amortized_upfront_commitment_for_billing_period` double,
    `savings_plan_recurring_commitment_for_billing_period` double,
    `resource_tags_user_bucket_name` string,
    `resource_tags_user_cost_tracking` string)
    PARTITIONED BY (
    `year` string,
    `month` string)
    ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    '<<LOCATION>>'
  • Run the following query in Athena to create the table for Amazon S3 Inventory. Verify and update the section for <<LOCATION>> at the end of the query and point it to the correct S3 bucket and location.
    • To get the exact value of the location, navigate to the bucket where inventory configurations are stored and navigate to the folder path Hive . Use the S3 URI to replace <<LOCATION>> in the query.query path location
      CREATE EXTERNAL TABLE s3_object_inventory(
               bucket string,
               key string,
               version_id string,
               is_latest boolean,
               is_delete_marker boolean,
               size bigint,
               last_modified_date bigint,
               storage_class string,
               is_multipart_uploaded boolean,
               replication_status string,
               encryption_status string,
               intelligent_tiering_access_tier string,
               bucket_key_status string
      ) PARTITIONED BY (
              dt string
      )
      ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
        STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
        OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
        LOCATION '<<LOCATION>>';
      
  • We need to refresh the partitions and add new inventory lists to the table. Use the following commands to add data to the CUR table and Amazon S3 Inventory table:
    MSCK REPAIR TABLE `account_cur`;
    
    MSCK REPAIR TABLE s3_object_inventory;

Run queries in Athena to allocate the cost of objects in an S3 bucket

Now we can query the data we have available to get a cost allocation breakdown at the prefix level.

We need to provide some information in the following queries:

  • Update <<YYYY-MM-DD>> with the date for which you want to analyze the data
  • Update <<prefix>> with the prefix values for your bucket that needs to be analyzed
  • Update <<bucket_name>> with the name of the bucket that needs to be analyzed

We use the following part of the query to calculate the size of storage being used by the target prefix that we want to calculate the cost for:

select date_parse(dt,'%Y-%m-%d-%H-%i') dt, cast (sum(size) as double) targetPrefixBytes
from s3_object_inventory
where date_parse(dt,'%Y-%m-%d-%H-%i') = cast('<<YYYY-MM-DD>>' as timestamp)
and key like '<<prefix>>/%'
group by dt

Next, we calculate the total size of the bucket on that particular date:

select date_parse(dt,'%Y-%m-%d-%H-%i') dt, cast (sum(size) as double) totalBytes
from s3_object_inventory
where date_parse(dt,'%Y-%m-%d-%H-%i') = cast('<<YYYY-MM-DD>>' as timestamp)
group by dt

We query the CUR table to get the cost of a particular bucket on a particular date:

select line_item_usage_start_date as dt, sum(line_item_blended_cost) as line_item_blended_cost
from "account_cur"
where line_item_product_code = 'AmazonS3'
and product_servicecode = 'AmazonS3'
and line_item_operation = 'StandardStorage'
and line_item_resource_id = '<<bucket_name>>'
and line_item_usage_start_date = cast('<<YYYY-MM-DD>>' as timestamp)
group by line_item_usage_start_date

Putting all of this together, we can calculate the cost of a particular prefix (folder or a file) on a specific date. The complete query is as follows:

with
cost as (select line_item_usage_start_date as dt, sum(line_item_blended_cost) as line_item_blended_cost
from "account_cur"
where line_item_product_code = 'AmazonS3'
and product_servicecode = 'AmazonS3'
and line_item_operation = 'StandardStorage'
and line_item_resource_id = '<<bucket_name>>'
and line_item_usage_start_date = cast('<<YYYY-MM-DD>>' as timestamp)
group by line_item_usage_start_date),
total as (select date_parse(dt,'%Y-%m-%d-%H-%i') dt, cast (sum(size) as double) totalBytes
from s3_object_inventory
where date_parse(dt,'%Y-%m-%d-%H-%i') = cast('<<YYYY-MM-DD>>' as timestamp)
group by dt),
target as (select date_parse(dt,'%Y-%m-%d-%H-%i') dt, cast (sum(size) as double) targetPrefixBytes
from s3_object_inventory
where date_parse(dt,'%Y-%m-%d-%H-%i') = cast('<<YYYY-MM-DD>>' as timestamp)
and key like '<<prefix>>/%'
group by dt)
select target.dt,
(target.targetPrefixBytes/ total.totalBytes * 100) percentUsed,
cost.line_item_blended_cost totalCost,
cost.line_item_blended_cost*(target.targetPrefixBytes/ total.totalBytes) as prefixCost
from target, total, cost
where target.dt = total.dt
and target.dt = cost.dt

The following screenshot shows the results table for the sample data we used in this post. We get the following information:

  • dt – Date
  • percentUsed – The percentage of prefix space compared to overall bucket space
  • totalCost – The total cost of the bucket
  • prefixCost – The cost of the space used by the prefix

final result percetage

Clean up

To stop incurring costs, be sure to disable Amazon S3 Inventory and Cost and Usage Reports when you’re done.

Delete the S3 buckets created for the Amazon S3 Inventory reports and Cost and Usage Reports to avoid storage charges.

Other methods for Amazon S3 storage analysis

Amazon S3 Storage Lens can provide a single view of object storage usage and activity across your entire Amazon S3 storage. With S3 Storage Lens, you can understand, analyze, and optimize storage with over 29 usage and activity metrics and interactive dashboards to aggregate data for your entire organization, specific accounts, Regions, buckets, or prefixes. All of this data is accessible on the Amazon S3 console or as raw data in an S3 bucket.

S3 Storage Lens doesn’t provide cost analysis based on an object or prefix in a single bucket. If you want visibility of storage usage and trends across the entire storage footprint along with recommendations on cost efficiency and data protection best practices, S3 Storage Lens is the right option. But if you want a cost analysis of specific S3 buckets and looking for ways to get cost allocation of S3 objects at the object or prefix level, the solution in this post would be the best fit.

Conclusion

In this post, we detailed how to create a cost breakdown model at the object or prefix level for S3 buckets that contains data for multiple business units and applications. We used Athena to query the reports and datasets produced by the AWS CUR and Amazon S3 Inventory features that, when correlated, give us the cost allocation at the object and prefix level. This solution gives you an easy way to calculate costs for independent objects and prefixes, which can be used for internal chargebacks or just to know the per-object or per-prefix spending in a shared S3 bucket.


About the Authors


Dagar Katyal
is a Senior Solutions Architect at AWS, based in Chicago, Illinois. He works with customers and provides guidance for key strategic initiatives important for their business. Dagar has an MBA and has spent years over 15 years working with customers on projects on analytics strategy, roadmap, and using data as a key differentiator. When not working with customers, Dagar spends time with his family and doing home improvement projects.


Saiteja Pudi
is a Solutions Architect at AWS, based in Dallas, Tx. He has been with AWS for more than 3 years now, helping customers derive the true potential of AWS by being their trusted advisor. He comes from an application development background, interested in Data Science and Machine Learning.

How Amazon Devices scaled and optimized real-time demand and supply forecasts using serverless analytics

Post Syndicated from Avinash Kolluri original https://aws.amazon.com/blogs/big-data/how-amazon-devices-scaled-and-optimized-real-time-demand-and-supply-forecasts-using-serverless-analytics/

Every day, Amazon devices process and analyze billions of transactions from global shipping, inventory, capacity, supply, sales, marketing, producers, and customer service teams. This data is used in procuring devices’ inventory to meet Amazon customers’ demands. With data volumes exhibiting a double-digit percentage growth rate year on year and the COVID pandemic disrupting global logistics in 2021, it became more critical to scale and generate near-real-time data.

This post shows you how we migrated to a serverless data lake built on AWS that consumes data automatically from multiple sources and different formats. Furthermore, it created further opportunities for our data scientists and engineers to use AI and machine learning (ML) services to continuously feed and analyze data.

Challenges and design concerns

Our legacy architecture primarily used Amazon Elastic Compute Cloud (Amazon EC2) to extract the data from various internal heterogeneous data sources and REST APIs with the combination of Amazon Simple Storage Service (Amazon S3) to load the data and Amazon Redshift for further analysis and generating the purchase orders.

We found this approach resulted in a few deficiencies and therefore drove improvements in the following areas:

  • Developer velocity – Due to the lack of unification and discovery of schema, which are primary reasons for runtime failures, developers often spent time dealing with operational and maintenance issues.
  • Scalability – Most of these datasets are shared across the globe. Therefore, we must meet the scaling limits while querying the data.
  • Minimal infrastructure maintenance – The current process spans multiple computes depending on the data source. Therefore, reducing infrastructure maintenance is critical.
  • Responsiveness to data source changes – Our current system gets data from various heterogeneous data stores and services. Any updates to those services takes months of developer cycles. The response times for these data sources are critical to our key stakeholders. Therefore, we must take a data-driven approach to select a high-performance architecture.
  • Storage and redundancy – Due to the heterogeneous data stores and models, it was challenging to store the different datasets from various business stakeholder teams. Therefore, having versioning along with incremental and differential data to compare will provide a remarkable ability to generate more optimized plans
  • Fugitive and accessibility – Due to the volatile nature of logistics, a few business stakeholder teams have a requirement to analyze the data on demand and generate the near-real-time optimal plan for the purchase orders. This introduces the need for both polling and pushing the data to access and analyze in near-real time.

Implementation strategy

Based on these requirements, we changed strategies and started analyzing each issue to identify the solution. Architecturally, we chose a serverless model, and the data lake architecture action line refers to all the architectural gaps and challenging features we determined were part of the improvements. From an operational standpoint, we designed a new shared responsibility model for data ingestion using AWS Glue instead of internal services (REST APIs) designed on Amazon EC2 to extract the data. We also used AWS Lambda for data processing. Then we chose Amazon Athena as our query service. To further optimize and improve the developer velocity for our data consumers, we added Amazon DynamoDB as a metadata store for different data sources landing in the data lake. These two decisions drove every design and implementation decision we made.

The following diagram illustrates the architecture

arch

In the following sections, we look at each component in the architecture in more detail as we move through the process flow.

AWS Glue for ETL

To meet customer demand while supporting the scale of new businesses’ data sources, it was critical for us to have a high degree of agility, scalability, and responsiveness in querying various data sources.

AWS Glue is a serverless data integration service that makes it easy for analytics users to discover, prepare, move, and integrate data from multiple sources. You can use it for analytics, ML, and application development. It also includes additional productivity and DataOps tooling for authoring, running jobs, and implementing business workflows.

With AWS Glue, you can discover and connect to more than 70 diverse data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor extract, transform, and load (ETL) pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Athena, Amazon EMR, and Amazon Redshift Spectrum.

AWS Glue made it easy for us to connect to the data in various data stores, edit and clean the data as needed, and load the data into an AWS-provisioned store for a unified view. AWS Glue jobs can be scheduled or called on demand to extract data from the client’s resource and from the data lake.

Some responsibilities of these jobs are as follows:

  • Extracting and converting a source entity to data entity
  • Enrich the data to contain year, month, and day for better cataloging and include a snapshot ID for better querying
  • Perform input validation and path generation for Amazon S3
  • Associate the accredited metadata based on the source system

Querying REST APIs from internal services is one of our core challenges, and considering the minimal infrastructure, we wanted to use them in this project. AWS Glue connectors assisted us in adhering to the requirement and goal. To query data from REST APIs and other data sources, we used PySpark and JDBC modules.

AWS Glue supports a wide variety of connection types. For more details, refer to Connection Types and Options for ETL in AWS Glue.

S3 bucket as landing zone

We used an S3 bucket as the immediate landing zone of the extracted data, which is further processed and optimized.

Lambda as AWS Glue ETL Trigger

We enabled S3 event notifications on the S3 bucket to trigger Lambda, which further partitions our data. The data is partitioned on InputDataSetName, Year, Month, and Date. Any query processor running on top of this data will scan only a subset of data for better cost and performance optimization. Our data can be stored in various formats, such as CSV, JSON, and Parquet.

The raw data isn’t ideal for most of our use cases to generate the optimal plan because it often has duplicates or incorrect data types. Most importantly, the data is in multiple formats, but we quickly modified the data and observed significant query performance gains from using the Parquet format. Here, we used one of the performance tips in Top 10 performance tuning tips for Amazon Athena.

AWS Glue jobs for ETL

We wanted better data segregation and accessibility, so we chose to have a different S3 bucket to improve performance further. We used the same AWS Glue jobs to further transform and load the data into the required S3 bucket and a portion of extracted metadata into DynamoDB.

DynamoDB as metadata store

Now that we have the data, various business stakeholders further consume it. This leaves us with two questions: which source data resides on the data lake and what version. We chose DynamoDB as our metadata store, which provides the latest details to the consumers to query the data effectively. Every dataset in our system is uniquely identified by snapshot ID, which we can search from our metadata store. Clients access this data store with an API’s.

Amazon S3 as data lake

For better data quality, we extracted the enriched data into another S3 bucket with the same AWS Glue job.

AWS Glue Crawler

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the process, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue. With a crawler, we could maintain the agnostic changes happening to the schema. This helped us automatically crawl the data from Amazon S3 and generate the schema and tables.

AWS Glue Data Catalog

The Data Catalog helped us maintain the catalog as an index to the data’s location, schema, and runtime metrics in Amazon S3. Information in the Data Catalog is stored as metadata tables, where each table specifies a single data store.

Athena for SQL queries

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries you run. We considered operational stability and increasing developer velocity as our key improvement factors.

We further optimized the process to query Athena so that users can plug in the values and the queries to get data out of Athena by creating the following:

  • An AWS Cloud Development Kit (AWS CDK) template to create Athena infrastructure and AWS Identity and Access Management (IAM) roles to access data lake S3 buckets and the Data Catalog from any account
  • A library so that client can provide an IAM role, query, data format, and output location to start an Athena query and get the status and result of the query run in the bucket of their choice.

To query Athena is a two-step process:

  • StartQueryExecution – This starts the query run and gets the run ID. Users can provide the output location where the output of the query will be stored.
  • GetQueryExecution – This gets the query status because the run is asynchronous. When successful, you can query the output in an S3 file or via API.

The helper method for starting the query run and getting the result would be in the library.

Data lake metadata service

This service is custom developed and interacts with DynamoDB to get the metadata (dataset name, snapshot ID, partition string, timestamp, and S3 link of the data) in the form of a REST API. When the schema is discovered, clients use Athena as their query processor to query the data.

Because all datasets have a snapshot ID are partitioned, the join query doesn’t result in a full table scan but only a partition scan on Amazon S3. We used Athena as our query processor because of its ease in not managing our query infrastructure. Later, if we feel we need something more, we can use either Redshift Spectrum or Amazon EMR.

Conclusion

Amazon Devices teams discovered significant value by moving to a data lake architecture using AWS Glue, which enabled multiple global business stakeholders to ingest data in more productive ways. This enabled the teams to generate the optimal plan to place purchase orders for devices by analyzing the different datasets in near-real time with appropriate business logic to solve the problems of the supply chain, demand, and forecast.

From an operational perspective, the investment has already started to pay off:

  • It standardized our ingestion, storage, and retrieval mechanisms, saving onboarding time. Before the implementation of this system, one dataset took 1 month to onboard. Due to our new architecture, we were able to onboard 15 new datasets in less than 2 months, which improved our agility by 70%.
  • It removed scaling bottlenecks, creating a homogeneous system that can quickly scale to thousands of runs.
  • The solution added schema and data quality validation before accepting any inputs and rejecting them if data quality violations are discovered.
  • It made it easy to retrieve datasets while supporting future simulations and back tester use cases requiring versioned inputs. This will make launching and testing models simpler.
  • The solution created a common infrastructure that can be easily extended to other teams across DIAL having similar issues with data ingestion, storage, and retrieval use cases.
  • Our operating costs have fallen by almost 90%.
  • This data lake can be accessed efficiently by our data scientists and engineers to perform other analytics and have a predictive approach as a future opportunity to generate accurate plans for the purchase orders.

The steps in this post can help you plan to build a similar modern data strategy using AWS-managed services to ingest data from different sources, automatically create metadata catalogs, share data seamlessly between the data lake and data warehouse, and create alerts in the event of an orchestrated data workflow failure.


About the authors

avinash_kolluriAvinash Kolluri is a Senior Solutions Architect at AWS. He works across Amazon Alexa and Devices to architect and design modern distributed solutions. His passion is to build cost-effective and highly scalable solutions on AWS. In his spare time, he enjoys cooking fusion recipes and traveling.

vipulVipul Verma is a Sr.Software Engineer at Amazon.com. He has been with Amazon since 2015,solving real-world challenges through technology that directly impact and improve the life of Amazon customers. In his spare time, he enjoys hiking.

Handle UPSERT data operations using open-source Delta Lake and AWS Glue

Post Syndicated from Praveen Allam original https://aws.amazon.com/blogs/big-data/handle-upsert-data-operations-using-open-source-delta-lake-and-aws-glue/

Many customers need an ACID transaction (atomic, consistent, isolated, durable) data lake that can log change data capture (CDC) from operational data sources. There is also demand for merging real-time data into batch data. Delta Lake framework provides these two capabilities. In this post, we discuss how to handle UPSERTs (updates and inserts) of the operational data using natively integrated Delta Lake with AWS Glue, and query the Delta Lake using Amazon Athena.

We examine a hypothetical insurance organization that issues commercial policies to small- and medium-scale businesses. The insurance prices vary based on several criteria, such as where the business is located, business type, earthquake or flood coverage, and so on. This organization is planning to build a data analytical platform, and the insurance policy data is one of the inputs to this platform. Because the business is growing, hundreds and thousands of new insurance policies are being enrolled and renewed every month. Therefore, all this operational data needs to be sent to Delta Lake in near-real time so that the organization can perform various analytics, and build machine learning (ML) models to serve their customers in a more efficient and cost-effective way.

Solution overview

The data can originate from any source, but typically customers want to bring operational data to data lakes to perform data analytics. One of the solutions is to bring the relational data by using AWS Database Migration Service (AWS DMS). AWS DMS tasks can be configured to copy the full load as well as ongoing changes (CDC). The full load and CDC load can be brought into the raw and curated (Delta Lake) storage layers in the data lake. To keep it simple, in this post we opt out of the data sources and ingestion layer; the assumption is that the data is already copied to the raw bucket in the form of CSV files. An AWS Glue ETL job does the necessary transformation and copies the data to the Delta Lake layer. The Delta Lake layer ensures ACID compliance of the source data.

The following diagram illustrates the solution architecture.
Architecture diagram

The use case we use in this post is about a commercial insurance company. We use a simple dataset that contains the following columns:

  • Policy – Policy number, entered as text
  • Expiry – Date that policy expires
  • Location – Location type (Urban or Rural)
  • State – Name of state where property is located
  • Region – Geographic region where property is located
  • Insured Value – Property value
  • Business Type – Business use type for property, such as Farming or Retail
  • Earthquake – Is earthquake coverage included (Y or N)
  • Flood – Is flood coverage included (Y or N)

The dataset contains a sample of 25 insurance policies. In the case of a production dataset, it may contain millions of records.

policy_id,expiry_date,location_name,state_code,region_name,insured_value,business_type,earthquake,flood
200242,2023-01-02,Urban,NY,East,1617630,Retail,N,N
200314,2023-01-02,Urban,NY,East,8678500,Apartment,Y,Y
200359,2023-01-02,Rural,WI,Midwest,2052660,Farming,N,N
200315,2023-01-02,Urban,NY,East,17580000,Apartment,Y,Y
200385,2023-01-02,Urban,NY,East,1925000,Hospitality,N,N
200388,2023-01-04,Urban,IL,Midwest,12934500,Apartment,Y,Y
200358,2023-01-05,Urban,WI,Midwest,928300,Office Bldg,N,N
200264,2023-01-07,Rural,NY,East,2219900,Farming,N,N
200265,2023-01-07,Urban,NY,East,14100000,Apartment,Y,Y
100582,2023-03-25,Urban,NJ,East,4651680,Apartment,Y,Y
100487,2023-03-25,Urban,NY,East,5990067,Apartment,N,N
100519,2023-03-25,Rural,NY,East,4102500,Farming,N,N
100462,2023-03-25,Urban,NY,East,3400000,Construction,Y,Y
100486,2023-03-26,Urban,NY,East,9973900,Apartment,Y,Y
100463,2023-03-27,Urban,NY,East,15480000,Office Bldg,Y,Y
100595,2023-03-27,Rural,NY,East,2446600,Farming,N,N
100617,2023-03-27,Urban,VT,Northeast,8861500,Office Bldg,N,N
100580,2023-03-30,Urban,NH,Northeast,97920,Office Bldg,Y,Y
100581,2023-03-30,Urban,NY,East,5150000,Apartment,Y,Y
100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,N
100503,2023-03-31,Urban,NJ,East,1761960,Office Bldg,N,N
100504,2023-03-31,Rural,NY,East,1649105,Farming,N,N
100616,2023-03-31,Urban,NY,East,2329500,Apartment,N,N
100611,2023-04-25,Urban,NJ,East,1595500,Office Bldg,Y,Y
100621,2023-04-25,Urban,MI,Central,394220,Retail,N,N

In the following sections, we walk through the steps to perform the Delta Lake UPSERT operations. We use the AWS Management Console to perform all the steps. However, you can also automate these steps using tools like AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), Terraforms, and so on.

Prerequisites

This post is focused towards architects, engineers, developers, and data scientists who build, design, and build analytical solutions on AWS. We expect a basic understanding of the console, AWS Glue, Amazon Simple Storage Service (Amazon S3), and Athena. Additionally, the persona is able to create AWS Identity and Access Management (IAM) policies and roles, create and run AWS Glue jobs and crawlers, and is able work with the Athena query editor.

Use Athena query engine version 3 to query delta lake tables, later in the section “Query the full load using Athena”.

Athena QE V3

Set up an S3 bucket for full and CDC load data feeds

To set up your S3 bucket, complete the following steps:

  1. Log in to your AWS account and choose a Region nearest to you.
  2. On the Amazon S3 console, create a new bucket. Make sure the name is unique (for example, delta-lake-cdc-blog-<some random number>).
  3. Create the following folders:
    1. $bucket_name/fullload – This folder is used for a one-time full load from the upstream data source
    2. $bucket_name/cdcload – This folder is used for copying the upstream data changes
    3. $bucket_name/delta – This folder holds the Delta Lake data files
  4. Copy the sample dataset and save it in a file called full-load.csv to your local machine.
  5. Upload the file using the Amazon S3 console into the folder $bucket_name/fullload.

s3 folders

Set up an IAM policy and role

In this section, we create an IAM policy for the S3 bucket access and a role for AWS Glue jobs to run, and also use the same role for querying the Delta Lake using Athena.

  1. On the IAM console, choose Polices in the navigation pane.
  2. Choose Create policy.
  3. Select JSON tab and paste the following policy code. Replace the {bucket_name} you created in the earlier step.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowListingOfFolders",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::{bucket_name}"
            ]
        },
        {
            "Sid": "ObjectAccessInBucket",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::{bucket_name}/*"
        }
    ]
}
  1. Name the policy delta-lake-cdc-blog-policy and select Create policy.
  2. On the IAM console, choose Roles in the navigation pane.
  3. Choose Create role.
  4. Select AWS Glue as your trusted entity and choose Next.
  5. Select the policy you just created, and with two additional AWS managed policies:
    1. delta-lake-cdc-blog-policy
    2. AWSGlueServiceRole
    3. CloudWatchFullAccess
  1. Choose Next.
  2. Give the role a name (for example, delta-lake-cdc-blog-role).

IAM role

Set up AWS Glue jobs

In this section, we set up two AWS Glue jobs: one for full load and one for the CDC load. Let’s start with the full load job.

  1. On the AWS Glue console, under Data Integration and ETL in the navigation pane, choose Jobs. AWS Glue Studio opens in a new tab.
  2. Select Spark script editor and choose Create.

Glue Studio Editor

  1. In the script editor, replace the code with the following code snippet
import sys
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','s3_bucket'])

# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

#Define the table schema
schema = StructType() \
      .add("policy_id",IntegerType(),True) \
      .add("expiry_date",DateType(),True) \
      .add("location_name",StringType(),True) \
      .add("state_code",StringType(),True) \
      .add("region_name",StringType(),True) \
      .add("insured_value",IntegerType(),True) \
      .add("business_type",StringType(),True) \
      .add("earthquake_coverage",StringType(),True) \
      .add("flood_coverage",StringType(),True) 

# Read the full load
sdf = spark.read.format("csv").option("header",True).schema(schema).load("s3://"+ args['s3_bucket']+"/fullload/")
sdf.printSchema()

# Write data as DELTA TABLE
sdf.write.format("delta").mode("overwrite").save("s3://"+ args['s3_bucket']+"/delta/insurance/")
  1. Navigate to the Job details tab.
  2. Provide a name for the job (for example, Full-Load-Job).
  3. For IAM Role¸ choose the role delta-lake-cdc-blog-role that you created earlier.
  4. For Worker type¸ choose G 2X.
  5. For Job bookmark, choose Disable.
  6. Set Number of retries to 0.
  7. Under Advanced properties¸ keep the default values, but provide the delta core JAR file path for Python library path and Dependent JARs path.
  8. Under Job parameters:
    1. Add the key --s3_bucket with the bucket name you created earlier as the value.
    2. Add the key --datalake-formats  and give the value delta
  9. Keep the remaining default values and choose Save.

Job details

Now let’s create the CDC load job.

  1. Create a second job called CDC-Load-Job.
  2. Follow the steps on the Job details tab as with the previous job.
  3. Alternatively, you may choose “Clone job” option from the Full-Load-Job, this will carry all the job details from the full load job.
  4. In the script editor, enter the following code snippet for the CDC logic:
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import expr

## For Delta lake
from delta.tables import DeltaTable


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','s3_bucket'])

# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

# Read the CDC load
cdc_df = spark.read.csv("s3://"+ args['s3_bucket']+"/cdcload")
cdc_df.show(5,True)

# now read the full load (latest data) as delta table
delta_df = DeltaTable.forPath(spark, "s3://"+ args['s3_bucket']+"/delta/insurance/")
delta_df.toDF().show(5,True)

# UPSERT process if matches on the condition the update else insert
# if there is no keyword then create a data set with Insert, Update and Delete flag and do it separately.
# for delete it has to run in loop with delete condition, this script do not handle deletes.
    
final_df = delta_df.alias("prev_df").merge( \
source = cdc_df.alias("append_df"), \
#matching on primarykey
condition = expr("prev_df.policy_id = append_df._c1"))\
.whenMatchedUpdate(set= {
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")} )\
.whenNotMatchedInsert(values =
#inserting a new row to Delta table
{   "prev_df.policy_id"             : col("append_df._c1"),
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")
})\
.execute()

Run the full load job

On the AWS Glue console, open full-load-job and choose Run. The job takes about 2 minutes to complete, and the job run status changes to Succeeded. Go to $bucket_name and open the delta folder, which contains the insurance folder. You can note the Delta Lake files in it. Delta location on S3

Create and run the AWS Glue crawler

In this step, we create an AWS Glue crawler with Delta Lake as the data source type. After successfully running the crawler, we inspect the data using Athena.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. Provide a name (for example, delta-lake-crawler) and choose Next.
  4. Choose Add a data source and choose Delta Lake as your data source.
  5. Copy your delta folder URI (for example, s3://delta-lake-cdc-blog-123456789/delta/insurance) and enter the Delta Lake table path location.
  6. Keep the default selection Create Native tables, and choose Add a Delta Lake data source.
  7. Choose Next.
  8. Choose the IAM role you created earlier, then choose Next.
  9. Select the default target database, and provide delta_ for the table name prefix. If no default database exist, you may create one.
  10. Choose Next.
  11. Choose Create crawler.
  12. Run the newly created crawler. After the crawler is complete, the delta_insurance table is available under Databases/Tables.
  13. Open the table to check the table overview.

You can observe nine columns and their data types. Glue table

Query the full load using Athena

In the earlier step, we created the delta_insurance table by running a crawler against the Delta Lake location. In this section, we query the delta_insurance table using Athena. Note that if you’re using Athena for the first time, set the query output folder to store the Athena query results (for example, s3://<your-s3-bucket>/query-output/).

  1. On the Athena console, open the query editor.
  2. Keep the default selections for Data source and Database.
  3. Run the query SELECT * FROM delta_insurance;. This query returns a total of 25 rows, the same as what was in the full load data feed.
  4. For the CDC comparison, run the following query and store the results in a location where you can compare these results later:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462,100463,100475,110001,110002)
order by policy_id;

The following screenshot shows the Athena query result.

Query results from full load

Upload the CDC data feed and run the CDC job

In this section, we update three insurance policies and insert two new policies.

  1. Copy the following insurance policy data and save it locally as cdc-load.csv:
U,100462,2024-12-31,Urban,NY,East,3400000,Construction,Y,Y
U,100463,2023-03-27,Urban,NY,East,1000000,Office Bldg,Y,Y
U,100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,Y
I,110001,2024-03-31,Urban,CA,WEST,210000,Office Bldg,N,N
I,110002,2024-03-31,Rural,FL,East,975000,Retail,N,Y

The first column in the CDC feed describes the UPSERT operations. U is for updating an existing record, and I is for inserting a new record.

  1. Upload the cdc-load.csv file to the $bucket_name/cdcload/ folder.
  2. On the AWS Glue console, run CDC-Load-Job. This job takes care of updating the Delta Lake accordingly.

The change details are as follows:

  • 100462 – Expiry date changes to 12/31/2024
  • 100463 – Insured value changes to 1 million
  • 100475 – This policy is now under a new flood zone
  • 110001 and 110002 – New policies added to the table
  1. Run the query again:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462, 100463,100475,110001,110002)
order by policy_id;

As shown in the following screenshot, the changes in the CDC data feed are reflected in the Athena query results.
Athena query results

Clean up

In this solution, we used all managed services, and there is no cost if AWS Glue jobs aren’t running. However, if you want to clean up the tasks, you can delete the two AWS Glue jobs, AWS Glue table, and S3 bucket.

Conclusion

Organizations are continuously looking at high performance, cost-effective, and scalable analytical solutions to extract the value of their operational data sources in near-real time. The analytical platform should be ready to receive changes in the operational data as soon as they occur. Typical data lake solutions face challenges to handle the changes in source data; the Delta Lake framework can close this gap. This post demonstrated how to build data lakes for UPSERT operations using AWS Glue and native Delta Lake tables, and how to query AWS Glue tables from Athena. You can implement your large scale UPSERT data operations using AWS Glue, Delta Lake and perform analytics using Amazon Athena.

References


About the Authors

 Praveen Allam is a Solutions Architect at AWS. He helps customers design scalable, better cost-perfromant enterprise-grade applications using the AWS Cloud. He builds solutions to help organizations make data-driven decisions.

Vivek Singh is Senior Solutions Architect with the AWS Data Lab team. He helps customers unblock their data journey on the AWS ecosystem. His interest areas are data pipeline automation, data quality and data governance, data lakes, and lake house architectures.