Tag Archives: AWS Glue

How Amazon Devices scaled and optimized real-time demand and supply forecasts using serverless analytics

Post Syndicated from Avinash Kolluri original https://aws.amazon.com/blogs/big-data/how-amazon-devices-scaled-and-optimized-real-time-demand-and-supply-forecasts-using-serverless-analytics/

Every day, Amazon devices process and analyze billions of transactions from global shipping, inventory, capacity, supply, sales, marketing, producers, and customer service teams. This data is used in procuring devices’ inventory to meet Amazon customers’ demands. With data volumes exhibiting a double-digit percentage growth rate year on year and the COVID pandemic disrupting global logistics in 2021, it became more critical to scale and generate near-real-time data.

This post shows you how we migrated to a serverless data lake built on AWS that consumes data automatically from multiple sources and different formats. Furthermore, it created further opportunities for our data scientists and engineers to use AI and machine learning (ML) services to continuously feed and analyze data.

Challenges and design concerns

Our legacy architecture primarily used Amazon Elastic Compute Cloud (Amazon EC2) to extract the data from various internal heterogeneous data sources and REST APIs with the combination of Amazon Simple Storage Service (Amazon S3) to load the data and Amazon Redshift for further analysis and generating the purchase orders.

We found this approach resulted in a few deficiencies and therefore drove improvements in the following areas:

  • Developer velocity – Due to the lack of unification and discovery of schema, which are primary reasons for runtime failures, developers often spent time dealing with operational and maintenance issues.
  • Scalability – Most of these datasets are shared across the globe. Therefore, we must meet the scaling limits while querying the data.
  • Minimal infrastructure maintenance – The current process spans multiple computes depending on the data source. Therefore, reducing infrastructure maintenance is critical.
  • Responsiveness to data source changes – Our current system gets data from various heterogeneous data stores and services. Any updates to those services takes months of developer cycles. The response times for these data sources are critical to our key stakeholders. Therefore, we must take a data-driven approach to select a high-performance architecture.
  • Storage and redundancy – Due to the heterogeneous data stores and models, it was challenging to store the different datasets from various business stakeholder teams. Therefore, having versioning along with incremental and differential data to compare will provide a remarkable ability to generate more optimized plans
  • Fugitive and accessibility – Due to the volatile nature of logistics, a few business stakeholder teams have a requirement to analyze the data on demand and generate the near-real-time optimal plan for the purchase orders. This introduces the need for both polling and pushing the data to access and analyze in near-real time.

Implementation strategy

Based on these requirements, we changed strategies and started analyzing each issue to identify the solution. Architecturally, we chose a serverless model, and the data lake architecture action line refers to all the architectural gaps and challenging features we determined were part of the improvements. From an operational standpoint, we designed a new shared responsibility model for data ingestion using AWS Glue instead of internal services (REST APIs) designed on Amazon EC2 to extract the data. We also used AWS Lambda for data processing. Then we chose Amazon Athena as our query service. To further optimize and improve the developer velocity for our data consumers, we added Amazon DynamoDB as a metadata store for different data sources landing in the data lake. These two decisions drove every design and implementation decision we made.

The following diagram illustrates the architecture

arch

In the following sections, we look at each component in the architecture in more detail as we move through the process flow.

AWS Glue for ETL

To meet customer demand while supporting the scale of new businesses’ data sources, it was critical for us to have a high degree of agility, scalability, and responsiveness in querying various data sources.

AWS Glue is a serverless data integration service that makes it easy for analytics users to discover, prepare, move, and integrate data from multiple sources. You can use it for analytics, ML, and application development. It also includes additional productivity and DataOps tooling for authoring, running jobs, and implementing business workflows.

With AWS Glue, you can discover and connect to more than 70 diverse data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor extract, transform, and load (ETL) pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Athena, Amazon EMR, and Amazon Redshift Spectrum.

AWS Glue made it easy for us to connect to the data in various data stores, edit and clean the data as needed, and load the data into an AWS-provisioned store for a unified view. AWS Glue jobs can be scheduled or called on demand to extract data from the client’s resource and from the data lake.

Some responsibilities of these jobs are as follows:

  • Extracting and converting a source entity to data entity
  • Enrich the data to contain year, month, and day for better cataloging and include a snapshot ID for better querying
  • Perform input validation and path generation for Amazon S3
  • Associate the accredited metadata based on the source system

Querying REST APIs from internal services is one of our core challenges, and considering the minimal infrastructure, we wanted to use them in this project. AWS Glue connectors assisted us in adhering to the requirement and goal. To query data from REST APIs and other data sources, we used PySpark and JDBC modules.

AWS Glue supports a wide variety of connection types. For more details, refer to Connection Types and Options for ETL in AWS Glue.

S3 bucket as landing zone

We used an S3 bucket as the immediate landing zone of the extracted data, which is further processed and optimized.

Lambda as AWS Glue ETL Trigger

We enabled S3 event notifications on the S3 bucket to trigger Lambda, which further partitions our data. The data is partitioned on InputDataSetName, Year, Month, and Date. Any query processor running on top of this data will scan only a subset of data for better cost and performance optimization. Our data can be stored in various formats, such as CSV, JSON, and Parquet.

The raw data isn’t ideal for most of our use cases to generate the optimal plan because it often has duplicates or incorrect data types. Most importantly, the data is in multiple formats, but we quickly modified the data and observed significant query performance gains from using the Parquet format. Here, we used one of the performance tips in Top 10 performance tuning tips for Amazon Athena.

AWS Glue jobs for ETL

We wanted better data segregation and accessibility, so we chose to have a different S3 bucket to improve performance further. We used the same AWS Glue jobs to further transform and load the data into the required S3 bucket and a portion of extracted metadata into DynamoDB.

DynamoDB as metadata store

Now that we have the data, various business stakeholders further consume it. This leaves us with two questions: which source data resides on the data lake and what version. We chose DynamoDB as our metadata store, which provides the latest details to the consumers to query the data effectively. Every dataset in our system is uniquely identified by snapshot ID, which we can search from our metadata store. Clients access this data store with an API’s.

Amazon S3 as data lake

For better data quality, we extracted the enriched data into another S3 bucket with the same AWS Glue job.

AWS Glue Crawler

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the process, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue. With a crawler, we could maintain the agnostic changes happening to the schema. This helped us automatically crawl the data from Amazon S3 and generate the schema and tables.

AWS Glue Data Catalog

The Data Catalog helped us maintain the catalog as an index to the data’s location, schema, and runtime metrics in Amazon S3. Information in the Data Catalog is stored as metadata tables, where each table specifies a single data store.

Athena for SQL queries

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries you run. We considered operational stability and increasing developer velocity as our key improvement factors.

We further optimized the process to query Athena so that users can plug in the values and the queries to get data out of Athena by creating the following:

  • An AWS Cloud Development Kit (AWS CDK) template to create Athena infrastructure and AWS Identity and Access Management (IAM) roles to access data lake S3 buckets and the Data Catalog from any account
  • A library so that client can provide an IAM role, query, data format, and output location to start an Athena query and get the status and result of the query run in the bucket of their choice.

To query Athena is a two-step process:

  • StartQueryExecution – This starts the query run and gets the run ID. Users can provide the output location where the output of the query will be stored.
  • GetQueryExecution – This gets the query status because the run is asynchronous. When successful, you can query the output in an S3 file or via API.

The helper method for starting the query run and getting the result would be in the library.

Data lake metadata service

This service is custom developed and interacts with DynamoDB to get the metadata (dataset name, snapshot ID, partition string, timestamp, and S3 link of the data) in the form of a REST API. When the schema is discovered, clients use Athena as their query processor to query the data.

Because all datasets have a snapshot ID are partitioned, the join query doesn’t result in a full table scan but only a partition scan on Amazon S3. We used Athena as our query processor because of its ease in not managing our query infrastructure. Later, if we feel we need something more, we can use either Redshift Spectrum or Amazon EMR.

Conclusion

Amazon Devices teams discovered significant value by moving to a data lake architecture using AWS Glue, which enabled multiple global business stakeholders to ingest data in more productive ways. This enabled the teams to generate the optimal plan to place purchase orders for devices by analyzing the different datasets in near-real time with appropriate business logic to solve the problems of the supply chain, demand, and forecast.

From an operational perspective, the investment has already started to pay off:

  • It standardized our ingestion, storage, and retrieval mechanisms, saving onboarding time. Before the implementation of this system, one dataset took 1 month to onboard. Due to our new architecture, we were able to onboard 15 new datasets in less than 2 months, which improved our agility by 70%.
  • It removed scaling bottlenecks, creating a homogeneous system that can quickly scale to thousands of runs.
  • The solution added schema and data quality validation before accepting any inputs and rejecting them if data quality violations are discovered.
  • It made it easy to retrieve datasets while supporting future simulations and back tester use cases requiring versioned inputs. This will make launching and testing models simpler.
  • The solution created a common infrastructure that can be easily extended to other teams across DIAL having similar issues with data ingestion, storage, and retrieval use cases.
  • Our operating costs have fallen by almost 90%.
  • This data lake can be accessed efficiently by our data scientists and engineers to perform other analytics and have a predictive approach as a future opportunity to generate accurate plans for the purchase orders.

The steps in this post can help you plan to build a similar modern data strategy using AWS-managed services to ingest data from different sources, automatically create metadata catalogs, share data seamlessly between the data lake and data warehouse, and create alerts in the event of an orchestrated data workflow failure.


About the authors

avinash_kolluriAvinash Kolluri is a Senior Solutions Architect at AWS. He works across Amazon Alexa and Devices to architect and design modern distributed solutions. His passion is to build cost-effective and highly scalable solutions on AWS. In his spare time, he enjoys cooking fusion recipes and traveling.

vipulVipul Verma is a Sr.Software Engineer at Amazon.com. He has been with Amazon since 2015,solving real-world challenges through technology that directly impact and improve the life of Amazon customers. In his spare time, he enjoys hiking.

Handle UPSERT data operations using open-source Delta Lake and AWS Glue

Post Syndicated from Praveen Allam original https://aws.amazon.com/blogs/big-data/handle-upsert-data-operations-using-open-source-delta-lake-and-aws-glue/

Many customers need an ACID transaction (atomic, consistent, isolated, durable) data lake that can log change data capture (CDC) from operational data sources. There is also demand for merging real-time data into batch data. Delta Lake framework provides these two capabilities. In this post, we discuss how to handle UPSERTs (updates and inserts) of the operational data using natively integrated Delta Lake with AWS Glue, and query the Delta Lake using Amazon Athena.

We examine a hypothetical insurance organization that issues commercial policies to small- and medium-scale businesses. The insurance prices vary based on several criteria, such as where the business is located, business type, earthquake or flood coverage, and so on. This organization is planning to build a data analytical platform, and the insurance policy data is one of the inputs to this platform. Because the business is growing, hundreds and thousands of new insurance policies are being enrolled and renewed every month. Therefore, all this operational data needs to be sent to Delta Lake in near-real time so that the organization can perform various analytics, and build machine learning (ML) models to serve their customers in a more efficient and cost-effective way.

Solution overview

The data can originate from any source, but typically customers want to bring operational data to data lakes to perform data analytics. One of the solutions is to bring the relational data by using AWS Database Migration Service (AWS DMS). AWS DMS tasks can be configured to copy the full load as well as ongoing changes (CDC). The full load and CDC load can be brought into the raw and curated (Delta Lake) storage layers in the data lake. To keep it simple, in this post we opt out of the data sources and ingestion layer; the assumption is that the data is already copied to the raw bucket in the form of CSV files. An AWS Glue ETL job does the necessary transformation and copies the data to the Delta Lake layer. The Delta Lake layer ensures ACID compliance of the source data.

The following diagram illustrates the solution architecture.
Architecture diagram

The use case we use in this post is about a commercial insurance company. We use a simple dataset that contains the following columns:

  • Policy – Policy number, entered as text
  • Expiry – Date that policy expires
  • Location – Location type (Urban or Rural)
  • State – Name of state where property is located
  • Region – Geographic region where property is located
  • Insured Value – Property value
  • Business Type – Business use type for property, such as Farming or Retail
  • Earthquake – Is earthquake coverage included (Y or N)
  • Flood – Is flood coverage included (Y or N)

The dataset contains a sample of 25 insurance policies. In the case of a production dataset, it may contain millions of records.

policy_id,expiry_date,location_name,state_code,region_name,insured_value,business_type,earthquake,flood
200242,2023-01-02,Urban,NY,East,1617630,Retail,N,N
200314,2023-01-02,Urban,NY,East,8678500,Apartment,Y,Y
200359,2023-01-02,Rural,WI,Midwest,2052660,Farming,N,N
200315,2023-01-02,Urban,NY,East,17580000,Apartment,Y,Y
200385,2023-01-02,Urban,NY,East,1925000,Hospitality,N,N
200388,2023-01-04,Urban,IL,Midwest,12934500,Apartment,Y,Y
200358,2023-01-05,Urban,WI,Midwest,928300,Office Bldg,N,N
200264,2023-01-07,Rural,NY,East,2219900,Farming,N,N
200265,2023-01-07,Urban,NY,East,14100000,Apartment,Y,Y
100582,2023-03-25,Urban,NJ,East,4651680,Apartment,Y,Y
100487,2023-03-25,Urban,NY,East,5990067,Apartment,N,N
100519,2023-03-25,Rural,NY,East,4102500,Farming,N,N
100462,2023-03-25,Urban,NY,East,3400000,Construction,Y,Y
100486,2023-03-26,Urban,NY,East,9973900,Apartment,Y,Y
100463,2023-03-27,Urban,NY,East,15480000,Office Bldg,Y,Y
100595,2023-03-27,Rural,NY,East,2446600,Farming,N,N
100617,2023-03-27,Urban,VT,Northeast,8861500,Office Bldg,N,N
100580,2023-03-30,Urban,NH,Northeast,97920,Office Bldg,Y,Y
100581,2023-03-30,Urban,NY,East,5150000,Apartment,Y,Y
100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,N
100503,2023-03-31,Urban,NJ,East,1761960,Office Bldg,N,N
100504,2023-03-31,Rural,NY,East,1649105,Farming,N,N
100616,2023-03-31,Urban,NY,East,2329500,Apartment,N,N
100611,2023-04-25,Urban,NJ,East,1595500,Office Bldg,Y,Y
100621,2023-04-25,Urban,MI,Central,394220,Retail,N,N

In the following sections, we walk through the steps to perform the Delta Lake UPSERT operations. We use the AWS Management Console to perform all the steps. However, you can also automate these steps using tools like AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), Terraforms, and so on.

Prerequisites

This post is focused towards architects, engineers, developers, and data scientists who build, design, and build analytical solutions on AWS. We expect a basic understanding of the console, AWS Glue, Amazon Simple Storage Service (Amazon S3), and Athena. Additionally, the persona is able to create AWS Identity and Access Management (IAM) policies and roles, create and run AWS Glue jobs and crawlers, and is able work with the Athena query editor.

Use Athena query engine version 3 to query delta lake tables, later in the section “Query the full load using Athena”.

Athena QE V3

Set up an S3 bucket for full and CDC load data feeds

To set up your S3 bucket, complete the following steps:

  1. Log in to your AWS account and choose a Region nearest to you.
  2. On the Amazon S3 console, create a new bucket. Make sure the name is unique (for example, delta-lake-cdc-blog-<some random number>).
  3. Create the following folders:
    1. $bucket_name/fullload – This folder is used for a one-time full load from the upstream data source
    2. $bucket_name/cdcload – This folder is used for copying the upstream data changes
    3. $bucket_name/delta – This folder holds the Delta Lake data files
  4. Copy the sample dataset and save it in a file called full-load.csv to your local machine.
  5. Upload the file using the Amazon S3 console into the folder $bucket_name/fullload.

s3 folders

Set up an IAM policy and role

In this section, we create an IAM policy for the S3 bucket access and a role for AWS Glue jobs to run, and also use the same role for querying the Delta Lake using Athena.

  1. On the IAM console, choose Polices in the navigation pane.
  2. Choose Create policy.
  3. Select JSON tab and paste the following policy code. Replace the {bucket_name} you created in the earlier step.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowListingOfFolders",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::{bucket_name}"
            ]
        },
        {
            "Sid": "ObjectAccessInBucket",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::{bucket_name}/*"
        }
    ]
}
  1. Name the policy delta-lake-cdc-blog-policy and select Create policy.
  2. On the IAM console, choose Roles in the navigation pane.
  3. Choose Create role.
  4. Select AWS Glue as your trusted entity and choose Next.
  5. Select the policy you just created, and with two additional AWS managed policies:
    1. delta-lake-cdc-blog-policy
    2. AWSGlueServiceRole
    3. CloudWatchFullAccess
  1. Choose Next.
  2. Give the role a name (for example, delta-lake-cdc-blog-role).

IAM role

Set up AWS Glue jobs

In this section, we set up two AWS Glue jobs: one for full load and one for the CDC load. Let’s start with the full load job.

  1. On the AWS Glue console, under Data Integration and ETL in the navigation pane, choose Jobs. AWS Glue Studio opens in a new tab.
  2. Select Spark script editor and choose Create.

Glue Studio Editor

  1. In the script editor, replace the code with the following code snippet
import sys
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','s3_bucket'])

# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

#Define the table schema
schema = StructType() \
      .add("policy_id",IntegerType(),True) \
      .add("expiry_date",DateType(),True) \
      .add("location_name",StringType(),True) \
      .add("state_code",StringType(),True) \
      .add("region_name",StringType(),True) \
      .add("insured_value",IntegerType(),True) \
      .add("business_type",StringType(),True) \
      .add("earthquake_coverage",StringType(),True) \
      .add("flood_coverage",StringType(),True) 

# Read the full load
sdf = spark.read.format("csv").option("header",True).schema(schema).load("s3://"+ args['s3_bucket']+"/fullload/")
sdf.printSchema()

# Write data as DELTA TABLE
sdf.write.format("delta").mode("overwrite").save("s3://"+ args['s3_bucket']+"/delta/insurance/")
  1. Navigate to the Job details tab.
  2. Provide a name for the job (for example, Full-Load-Job).
  3. For IAM Role¸ choose the role delta-lake-cdc-blog-role that you created earlier.
  4. For Worker type¸ choose G 2X.
  5. For Job bookmark, choose Disable.
  6. Set Number of retries to 0.
  7. Under Advanced properties¸ keep the default values, but provide the delta core JAR file path for Python library path and Dependent JARs path.
  8. Under Job parameters:
    1. Add the key --s3_bucket with the bucket name you created earlier as the value.
    2. Add the key --datalake-formats  and give the value delta
  9. Keep the remaining default values and choose Save.

Job details

Now let’s create the CDC load job.

  1. Create a second job called CDC-Load-Job.
  2. Follow the steps on the Job details tab as with the previous job.
  3. Alternatively, you may choose “Clone job” option from the Full-Load-Job, this will carry all the job details from the full load job.
  4. In the script editor, enter the following code snippet for the CDC logic:
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import expr

## For Delta lake
from delta.tables import DeltaTable


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','s3_bucket'])

# Initialize Spark Session with Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

# Read the CDC load
cdc_df = spark.read.csv("s3://"+ args['s3_bucket']+"/cdcload")
cdc_df.show(5,True)

# now read the full load (latest data) as delta table
delta_df = DeltaTable.forPath(spark, "s3://"+ args['s3_bucket']+"/delta/insurance/")
delta_df.toDF().show(5,True)

# UPSERT process if matches on the condition the update else insert
# if there is no keyword then create a data set with Insert, Update and Delete flag and do it separately.
# for delete it has to run in loop with delete condition, this script do not handle deletes.
    
final_df = delta_df.alias("prev_df").merge( \
source = cdc_df.alias("append_df"), \
#matching on primarykey
condition = expr("prev_df.policy_id = append_df._c1"))\
.whenMatchedUpdate(set= {
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")} )\
.whenNotMatchedInsert(values =
#inserting a new row to Delta table
{   "prev_df.policy_id"             : col("append_df._c1"),
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")
})\
.execute()

Run the full load job

On the AWS Glue console, open full-load-job and choose Run. The job takes about 2 minutes to complete, and the job run status changes to Succeeded. Go to $bucket_name and open the delta folder, which contains the insurance folder. You can note the Delta Lake files in it. Delta location on S3

Create and run the AWS Glue crawler

In this step, we create an AWS Glue crawler with Delta Lake as the data source type. After successfully running the crawler, we inspect the data using Athena.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. Provide a name (for example, delta-lake-crawler) and choose Next.
  4. Choose Add a data source and choose Delta Lake as your data source.
  5. Copy your delta folder URI (for example, s3://delta-lake-cdc-blog-123456789/delta/insurance) and enter the Delta Lake table path location.
  6. Keep the default selection Create Native tables, and choose Add a Delta Lake data source.
  7. Choose Next.
  8. Choose the IAM role you created earlier, then choose Next.
  9. Select the default target database, and provide delta_ for the table name prefix. If no default database exist, you may create one.
  10. Choose Next.
  11. Choose Create crawler.
  12. Run the newly created crawler. After the crawler is complete, the delta_insurance table is available under Databases/Tables.
  13. Open the table to check the table overview.

You can observe nine columns and their data types. Glue table

Query the full load using Athena

In the earlier step, we created the delta_insurance table by running a crawler against the Delta Lake location. In this section, we query the delta_insurance table using Athena. Note that if you’re using Athena for the first time, set the query output folder to store the Athena query results (for example, s3://<your-s3-bucket>/query-output/).

  1. On the Athena console, open the query editor.
  2. Keep the default selections for Data source and Database.
  3. Run the query SELECT * FROM delta_insurance;. This query returns a total of 25 rows, the same as what was in the full load data feed.
  4. For the CDC comparison, run the following query and store the results in a location where you can compare these results later:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462,100463,100475,110001,110002)
order by policy_id;

The following screenshot shows the Athena query result.

Query results from full load

Upload the CDC data feed and run the CDC job

In this section, we update three insurance policies and insert two new policies.

  1. Copy the following insurance policy data and save it locally as cdc-load.csv:
U,100462,2024-12-31,Urban,NY,East,3400000,Construction,Y,Y
U,100463,2023-03-27,Urban,NY,East,1000000,Office Bldg,Y,Y
U,100475,2023-03-31,Rural,WI,Midwest,1451662,Farming,N,Y
I,110001,2024-03-31,Urban,CA,WEST,210000,Office Bldg,N,N
I,110002,2024-03-31,Rural,FL,East,975000,Retail,N,Y

The first column in the CDC feed describes the UPSERT operations. U is for updating an existing record, and I is for inserting a new record.

  1. Upload the cdc-load.csv file to the $bucket_name/cdcload/ folder.
  2. On the AWS Glue console, run CDC-Load-Job. This job takes care of updating the Delta Lake accordingly.

The change details are as follows:

  • 100462 – Expiry date changes to 12/31/2024
  • 100463 – Insured value changes to 1 million
  • 100475 – This policy is now under a new flood zone
  • 110001 and 110002 – New policies added to the table
  1. Run the query again:
SELECT * FROM delta_insurance
WHERE policy_id IN (100462, 100463,100475,110001,110002)
order by policy_id;

As shown in the following screenshot, the changes in the CDC data feed are reflected in the Athena query results.
Athena query results

Clean up

In this solution, we used all managed services, and there is no cost if AWS Glue jobs aren’t running. However, if you want to clean up the tasks, you can delete the two AWS Glue jobs, AWS Glue table, and S3 bucket.

Conclusion

Organizations are continuously looking at high performance, cost-effective, and scalable analytical solutions to extract the value of their operational data sources in near-real time. The analytical platform should be ready to receive changes in the operational data as soon as they occur. Typical data lake solutions face challenges to handle the changes in source data; the Delta Lake framework can close this gap. This post demonstrated how to build data lakes for UPSERT operations using AWS Glue and native Delta Lake tables, and how to query AWS Glue tables from Athena. You can implement your large scale UPSERT data operations using AWS Glue, Delta Lake and perform analytics using Amazon Athena.

References


About the Authors

 Praveen Allam is a Solutions Architect at AWS. He helps customers design scalable, better cost-perfromant enterprise-grade applications using the AWS Cloud. He builds solutions to help organizations make data-driven decisions.

Vivek Singh is Senior Solutions Architect with the AWS Data Lab team. He helps customers unblock their data journey on the AWS ecosystem. His interest areas are data pipeline automation, data quality and data governance, data lakes, and lake house architectures.

Build a data lake with Apache Flink on Amazon EMR

Post Syndicated from Jianwei Li original https://aws.amazon.com/blogs/big-data/build-a-unified-data-lake-with-apache-flink-on-amazon-emr/

To build a data-driven business, it is important to democratize enterprise data assets in a data catalog. With a unified data catalog, you can quickly search datasets and figure out data schema, data format, and location. The AWS Glue Data Catalog provides a uniform repository where disparate systems can store and find metadata to keep track of data in data silos.

Apache Flink is a widely used data processing engine for scalable streaming ETL, analytics, and event-driven applications. It provides precise time and state management with fault tolerance. Flink can process bounded stream (batch) and unbounded stream (stream) with a unified API or application. After data is processed with Apache Flink, downstream applications can access the curated data with a unified data catalog. With unified metadata, both data processing and data consuming applications can access the tables using the same metadata.

This post shows you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog so that you can ingest streaming data in real time and access the data in near-real time for business analysis.

Apache Flink connector and catalog architecture

Apache Flink uses a connector and catalog to interact with data and metadata. The following diagram shows the architecture of the Apache Flink connector for data read/write, and catalog for metadata read/write.

Flink Glue Architecture

For data read/write, Flink has the interface DynamicTableSourceFactory for read and DynamicTableSinkFactory for write. A different Flink connector implements two interfaces to access data in different storage. For example, the Flink FileSystem connector has FileSystemTableFactory to read/write data in Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (Amazon S3), the Flink HBase connector has HBase2DynamicTableFactory to read/write data in HBase, and the Flink Kafka connector has KafkaDynamicTableFactory to read/write data in Kafka. You can refer to Table & SQL Connectors for more information.

For metadata read/write, Flink has the catalog interface. Flink has three built-in implementations for the catalog. GenericInMemoryCatalog stores the catalog data in memory. JdbcCatalog stores the catalog data in a JDBC-supported relational database. As of this writing, MySQL and PostgreSQL databases are supported in the JDBC catalog. HiveCatalog stores the catalog data in Hive Metastore. HiveCatalog uses HiveShim to provide different Hive version compatibility. We can configure different metastore clients to use Hive Metastore or the AWS Glue Data Catalog. In this post, we configure the Amazon EMR property hive.metastore.client.factory.class to com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory (see Using the AWS Glue Data Catalog as the metastore for Hive) so that we can use the AWS Glue Data Catalog to store Flink catalog data. Refer to Catalogs for more information.

Most Flink built-in connectors, such as for Kafka, Amazon Kinesis, Amazon DynamoDB, Elasticsearch, or FileSystem, can use Flink HiveCatalog to store metadata in the AWS Glue Data Catalog. However, some connector implementations such as Apache Iceberg have their own catalog management mechanism. FlinkCatalog in Iceberg implements the catalog interface in Flink. FlinkCatalog in Iceberg has a wrapper to its own catalog implementation. The following diagram shows the relationship between Apache Flink, the Iceberg connector, and the catalog. For more information, refer to Creating catalogs and using catalogs and Catalogs.

Flink Iceberg Glue Architecture

Apache Hudi also has its own catalog management. Both HoodieCatalog and HoodieHiveCatalog implements a catalog interface in Flink. HoodieCatalog stores metadata in a file system such as HDFS. HoodieHiveCatalog stores metadata in Hive Metastore or the AWS Glue Data Catalog, depending on whether you configure hive.metastore.client.factory.class to use com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory. The following diagram shows relationship between Apache Flink, the Hudi connector, and the catalog. For more information, refer to Create Catalog.

Flink Hudi Glue Architecture

Because Iceberg and Hudi have different catalog management mechanisms, we show three scenarios of Flink integration with the AWS Glue Data Catalog in this post:

  • Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog
  • Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog
  • Read/Write to other storage format in Flink with metadata in Glue Data Catalog

Solution overview

The following diagram shows the overall architecture of the solution described in this post.

Flink Glue Integration

In this solution, we enable an Amazon RDS for MySQL binlog to extract transaction changes in real time. The Amazon EMR Flink CDC connector reads the binlog data and processes the data. Transformed data can be stored in Amazon S3. We use the AWS Glue Data Catalog to store the metadata such as table schema and table location. Downstream data consumer applications such as Amazon Athena or Amazon EMR Trino access the data for business analysis.

The following are the high-level steps to set up this solution:

  1. Enable binlog for Amazon RDS for MySQL and initialize the database.
  2. Create an EMR cluster with the AWS Glue Data Catalog.
  3. Ingest change data capture (CDC) data with Apache Flink CDC in Amazon EMR.
  4. Store the processed data in Amazon S3 with metadata in the AWS Glue Data Catalog.
  5. Verify all table metadata is stored in the AWS Glue Data Catalog.
  6. Consume data with Athena or Amazon EMR Trino for business analysis.
  7. Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables.

Prerequisites

This post uses an AWS Identity and Access Management (IAM) role with permissions for the following services:

  • Amazon RDS for MySQL (5.7.40)
  • Amazon EMR (6.9.0)
  • Amazon Athena
  • AWS Glue Data Catalog
  • Amazon S3

Enable binlog for Amazon RDS for MySQL and initialize the database

To enable CDC in Amazon RDS for MySQL, we need to configure binary logging for Amazon RDS for MySQL. Refer to Configuring MySQL binary logging for more information. We also create the database salesdb in MySQL and create the tables customer, order, and others to set up the data source.

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Create a new parameter group for MySQL.
  3. Edit the parameter group you just created to set binlog_format=ROW.

RDS-Binlog-Format

  1. Edit the parameter group you just created to set binlog_row_image=full.

RDS-Binlog-Row-Image

  1. Create an RDS for MySQL DB instance with the parameter group.
  2. Note down the values for hostname, username, and password, which we use later.
  3. Download the MySQL database initialization script from Amazon S3 by running the following command:
aws s3 cp s3://emr-workshops-us-west-2/glue_immersion_day/scripts/salesdb.sql ./salesdb.sql
  1. Connect to the RDS for MySQL database and run the salesdb.sql command to initialize the database, providing the host name and user name according to your RDS for MySQL database configuration:
mysql -h <hostname> -u <username> -p
mysql> source salesdb.sql

Create an EMR cluster with the AWS Glue Data Catalog

From Amazon EMR 6.9.0, the Flink table API/SQL can integrate with the AWS Glue Data Catalog. To use the Flink and AWS Glue integration, you must create an Amazon EMR 6.9.0 or later version.

  1. Create the file iceberg.properties for the Amazon EMR Trino integration with the Data Catalog. When the table format is Iceberg, your file should have following content:
iceberg.catalog.type=glue
connector.name=iceberg
  1. Upload iceberg.properties to an S3 bucket, for example DOC-EXAMPLE-BUCKET.

For more information on how to integrate Amazon EMR Trino with Iceberg, refer to Use an Iceberg cluster with Trino.

  1. Create the file trino-glue-catalog-setup.sh to configure the Trino integration with the Data Catalog. Use trino-glue-catalog-setup.sh as the bootstrap script. Your file should have the following content (replace DOC-EXAMPLE-BUCKET with your S3 bucket name):
set -ex 
sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /etc/trino/conf/catalog/iceberg.properties

  1. Upload trino-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Create bootstrap actions to install additional software to run a bootstrap script.

  1. Create the file flink-glue-catalog-setup.sh to configure the Flink integration with the Data Catalog.
  2. Use a script runner and run the flink-glue-catalog-setup.sh script as a step function.

Your file should have the following content (the JAR file name here is using Amazon EMR 6.9.0; a later version JAR name may change, so make sure to update according to your Amazon EMR version).

Note that here we use an Amazon EMR step, not a bootstrap, to run this script. An Amazon EMR step script is run after Amazon EMR Flink is provisioned.

set -ex

sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/hive-exec.jar /lib/flink/lib
sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib
sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar
sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar
sudo chmod 755 /usr/lib/flink/lib/hive-exec.jar
sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

sudo wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar -O /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
sudo chmod 755 /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar

sudo ln -s /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar /usr/lib/flink/lib/
sudo ln -s /usr/lib/hudi/hudi-flink-bundle.jar /usr/lib/flink/lib/

sudo mv /usr/lib/flink/opt/flink-table-planner_2.12-1.15.2.jar /usr/lib/flink/lib/
sudo mv /usr/lib/flink/lib/flink-table-planner-loader-1.15.2.jar /usr/lib/flink/opt/
  1. Upload flink-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Refer to Configuring Flink to Hive Metastore in Amazon EMR for more information on how to configure Flink and Hive Metastore. Refer to Run commands and scripts on an Amazon EMR cluster for more details on running the Amazon EMR step script.

  1. Create an EMR 6.9.0 cluster with the applications Hive, Flink, and Trino.

You can create an EMR cluster with the AWS Command Line Interface (AWS CLI) or the AWS Management Console. Refer to the appropriate subsection for instructions.

Create an EMR cluster with the AWS CLI

To use the AWS CLI, complete the following steps:

  1. Create the file emr-flink-trino-glue.json to configure Amazon EMR to use the Data Catalog. Your file should have the following content:
[
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]
  1. Run the following command to create the EMR cluster. Provide your local emr-flink-trino-glue.json parent folder path, S3 bucket, EMR cluster Region, EC2 key name, and S3 bucket for EMR logs.
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Hive Name=Flink Name=Spark Name=Trino \
--region us-west-2 \
--name flink-trino-glue-emr69 \
--configurations "file:///<your configuration path>/emr-flink-trino-glue.json" \
--bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET/trino-glue-catalog-setup.sh","Name":"Add iceberg.properties for Trino"}]' \
--steps '[{"Args":["s3://DOC-EXAMPLE-BUCKET/flink-glue-catalog-setup.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Flink-glue-integration"}]' \
--instance-groups \
InstanceGroupType=MASTER,InstanceType=m6g.2xlarge,InstanceCount=1 \
InstanceGroupType=CORE,InstanceType=m6g.2xlarge,InstanceCount=2 \
--use-default-roles \
--ebs-root-volume-size 30 \
--ec2-attributes KeyName=<keyname> \
--log-uri s3://<s3-bucket-for-emr>/elasticmapreduce/

Create an EMR cluster on the console

To use the console, complete the following steps:

  1. On the Amazon EMR console, create an EMR cluster and select Use for Hive table metadata for AWS Glue Data Catalog settings.
  2. Add configuration settings with the following code:
[
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]

EMR-6.9-Flink-Hive-Glue-1

  1. In the Steps section, add a step called Custom JAR.
  2. Set JAR location to s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar, where <region> is the region in which your EMR cluster resides.
  3. Set Arguments to the S3 path you uploaded earlier.

EMR-6.9-Flink-Hive-Glue-2

  1. In the Bootstrap Actions section, choose Custom Action.
  2. Set Script location to the S3 path you uploaded.

EMR-6.9-Flink-Hive-Glue-3

  1. Continue the subsequent steps to complete your EMR cluster creation.

Ingest CDC data with Apache Flink CDC in Amazon EMR

The Flink CDC connector supports reading database snapshots and captures updates in the configured tables. We have deployed the Flink CDC connector for MySQL by downloading flink-sql-connector-mysql-cdc-2.2.1.jar and putting it into the Flink library when we create our EMR cluster. The Flink CDC connector can use the Flink Hive catalog to store Flink CDC table schema into Hive Metastore or the AWS Glue Data Catalog. In this post, we use the Data Catalog to store our Flink CDC table.

Complete the following steps to ingest RDS for MySQL databases and tables with Flink CDC and store metadata in the Data Catalog:

  1. SSH to the EMR primary node.
  2. Start Flink on a YARN session by running the following command, providing your S3 bucket name:
flink-yarn-session -d -jm 2048 -tm 4096 -s 2 \
-D state.backend=rocksdb \
-D state.backend.incremental=true \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=s3://<flink-glue-integration-bucket>/flink-checkponts/ \
-D state.checkpoints.num-retained=10 \
-D execution.checkpointing.interval=10s \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D execution.checkpointing.max-concurrent-checkpoints=1
  1. Start the Flink SQL client CLI by running the following command:
/usr/lib/flink/bin/sql-client.sh embedded
  1. Create the Flink Hive catalog by specifying the catalog type as hive and providing your S3 bucket name:
CREATE CATALOG glue_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://<flink-glue-integration-bucket>/flink-glue-for-hive/warehouse/')
use flink_cdc_db;

Because we’re configuring the EMR Hive catalog use the AWS Glue Data Catalog, all the databases and tables created in the Flink Hive catalog are stored in the Data Catalog.

  1. Create the Flink CDC table, providing the host name, user name, and password for the RDS for MySQL instance you created earlier.

Note that because the RDS for MySQL user name and password will be stored in the Data Catalog as table properties, you should be enable AWS Glue database/table authorization with AWS Lake Formation to protect your sensitive data.

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_cdc` (
`CUST_ID` double NOT NULL,
`NAME` STRING NOT NULL,
`MKTSEGMENT` STRING NOT NULL,
PRIMARY KEY (`CUST_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` (
`SITE_ID` double NOT NULL,
`CUST_ID` double NOT NULL,
`ADDRESS` STRING NOT NULL,
`CITY` STRING NOT NULL,
`STATE` STRING NOT NULL,
`COUNTRY` STRING NOT NULL,
`PHONE` STRING NOT NULL,
PRIMARY KEY (`SITE_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER_SITE'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` (
`ORDER_ID` int NOT NULL,
`SITE_ID` double NOT NULL,
`ORDER_DATE` TIMESTAMP NOT NULL,
`SHIP_MODE` STRING NOT NULL
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'SALES_ORDER_ALL',
'scan.incremental.snapshot.enabled' = 'FALSE'
);
  1. Query the table you just created:
SELECT count(O.ORDER_ID) AS ORDER_COUNT,
C.CUST_ID,
C.NAME,
C.MKTSEGMENT
FROM   customer_cdc C
JOIN customer_site_cdc CS
ON C.CUST_ID = CS.CUST_ID
JOIN sales_order_all_cdc O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT;

You will get a query result like following screenshot.

Flink-SQL-CDC-Test

Store processed data in Amazon S3 with metadata in the Data Catalog

As we’re ingesting the relational database data in Amazon RDS for MySQL, raw data may be updated or deleted. To support data update and delete, we can choose data lake technologies such as Apache Iceberg or Apache Hudi to store the processed data. As we mentioned earlier, Iceberg and Hudi have different catalog management. We show both scenarios to use Flink to read/write the Iceberg and Hudi tables with metadata in the AWS Glue Data Catalog.

For non-Iceberg and non-Hudi, we use a FileSystem Parquet file to show how the Flink built-in connector uses the Data Catalog.

Read/Write to Iceberg tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Iceberg

  1. Create a Flink Iceberg catalog using the Data Catalog by specifying catalog-impl as org.apache.iceberg.aws.glue.GlueCatalog.

For more information about Flink and Data Catalog integration for Iceberg, refer to Glue Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_iceberg WITH (
'type'='iceberg',
'warehouse'='s3://<flink-glue-integration-bucket>/flink-glue-for-iceberg/warehouse/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager',
'lock.table'='FlinkGlue4IcebergLockTable' );
  1. Create an Iceberg table to store processed data:
USE CATALOG glue_catalog_for_iceberg;
CREATE DATABASE IF NOT EXISTS flink_glue_iceberg_db;
USE flink_glue_iceberg_db;
CREATE TABLE `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'format-version'='2',
'write.upsert.enabled'='true');
  1. Insert the processed data into Iceberg:
INSERT INTO `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to Hudi tables in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Hudi

Complete the following steps:

  1. Create a catalog for Hudi to use the Hive catalog by specifying mode as hms.

Because we already configured Amazon EMR to use the Data Catalog when we created the EMR cluster, this Hudi Hive catalog uses the Data Catalog under the hood. For more information about Flink and Data Catalog integration for Hudi, refer to Create Catalog.

  1. In the Flink SQL client CLI, run the following command, providing your S3 bucket name:
CREATE CATALOG glue_catalog_for_hudi WITH (
'type' = 'hudi',
'mode' = 'hms',
'table.external' = 'true',
'default-database' = 'default',
'hive.conf.dir' = '/etc/hive/conf.dist',
'catalog.path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/'
);
  1. Create a Hudi table using the Data Catalog, and provide your S3 bucket name:
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'connector' = 'hudi',
'write.tasks' = '4',
'path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/customer_summary',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
  1. Insert the processed data into Hudi:
INSERT INTO `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Read/Write to other storage format in Flink with metadata in Glue Data Catalog

The following diagram shows the architecture for this configuration.

Flink Glue Integration for Parquet

We already created the Flink Hive catalog in the previous step, so we’ll reuse that catalog.

  1. In the Flink SQL client CLI, run the following command:
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_hive_parquet_db;
use flink_hive_parquet_db;

We change the SQL dialect to Hive to create a table with Hive syntax.

  1. Create a table with the following SQL, and provide your S3 bucket name:
SET table.sql-dialect=hive;

CREATE TABLE `customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT
)
STORED AS parquet
LOCATION 's3://<flink-glue-integration-bucket>/flink-glue-for-hive-parquet/warehouse/customer_summary';

Because Parquet files don’t support updated rows, we can’t consume data from CDC data. However, we can consume data from Iceberg or Hudi.

  1. Use the following code to query the Iceberg table and insert data into the Parquet table:
SET table.sql-dialect=default;
SET execution.runtime-mode = batch;
INSERT INTO `glue_catalog`.`flink_hive_parquet_db`.`customer_summary`
SELECT * from `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`;

Verify all table metadata is stored in the Data Catalog

You can navigate to the AWS Glue console to verify all the tables are stored in the Data Catalog.

  1. On the AWS Glue console, choose Databases in the navigation pane to list all the databases we created.

Glue-Databases

  1. Open a database and verify that all the tables are in that database.

Glue-Tables

Consume data with Athena or Amazon EMR Trino for business analysis

You can use Athena or Amazon EMR Trino to access the result data.

Query the data with Athena

To access the data with Athena, complete the following steps:

  1. Open the Athena query editor.
  2. Choose flink_glue_iceberg_db for Database.

You should see the customer_summary table listed.

  1. Run the following SQL script to query the Iceberg result table:
select * from customer_summary order by order_count desc limit 10

The query result will look like the following screenshot.

Athena-Iceberg-Query

  1. For the Hudi table, change Database to flink_glue_hudi_db and run the same SQL query.

Athena-Hudi-Query

  1. For the Parquet table, change Database to flink_hive_parquet_db and run the same SQL query.

Athena-Parquet-Query

Query the data with Amazon EMR Trino

To access Iceberg with Amazon EMR Trino, SSH to the EMR primary node.

  1. Run the following command to start the Trino CLI:
trino-cli --catalog iceberg

Amazon EMR Trino can now query the tables in the AWS Glue Data Catalog.

  1. Run the following command to query the result table:
show schemas;
use flink_glue_iceberg_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

The query result looks like the following screenshot.

EMR-Trino-Iceberg-Query

  1. Exit the Trino CLI.
  2. Start the Trino CLI with the hive catalog to query the Hudi table:
trino-cli --catalog hive
  1. Run the following command to query the Hudi table:
show schemas;
use flink_glue_hudi_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

Update and delete source records in Amazon RDS for MySQL and validate the reflection of the data lake tables

We can update and delete some records in the RDS for MySQL database and then validate that the changes are reflected in the Iceberg and Hudi tables.

  1. Connect to the RDS for MySQL database and run the following SQL:
update CUSTOMER set NAME = 'updated_name' where CUST_ID=7;

delete from CUSTOMER where CUST_ID=11;
  1. Query the customer_summary table with Athena or Amazon EMR Trino.

The updated and deleted records are reflected in the Iceberg and Hudi tables.

Athena-Iceberg-Query-Updated

Clean up

When you’re done with this exercise, complete the following steps to delete your resources and stop incurring costs:

  1. Delete the RDS for MySQL database.
  2. Delete the EMR cluster.
  3. Drop the databases and tables created in the Data Catalog.
  4. Remove files in Amazon S3.

Conclusion

This post showed you how to integrate Apache Flink in Amazon EMR with the AWS Glue Data Catalog. You can use a Flink SQL connector to read/write data in a different store, such as Kafka, CDC, HBase, Amazon S3, Iceberg, or Hudi. You can also store the metadata in the Data Catalog. The Flink table API has the same connector and catalog implementation mechanism. In a single session, we can use multiple catalog instances pointing to different types, like IcebergCatalog and HiveCatalog, and use then interchangeably in your query. You can also write code with the Flink table API to develop the same solution to integrate Flink and the Data Catalog.

In our solution, we consumed the RDS for MySQL binary log directly with Flink CDC. You can also use Amazon MSK Connect to consume the binary log with MySQL Debezim and store the data in Amazon Managed Streaming for Apache Kafka (Amazon MSK). Refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi for more information.

With the Amazon EMR Flink unified batch and streaming data processing function, you can ingest and process data with one computing engine. With Apache Iceberg and Hudi integrated in Amazon EMR, you can build an evolvable and scalable data lake. With the AWS Glue Data Catalog, you can manage all enterprise data catalogs in a unified manner and consume data easily.

Follow the steps in this post to build your unified batch and streaming solution with Amazon EMR Flink and the AWS Glue Data Catalog. Please leave a comment if you have any questions.


About the Authors

Jianwei Li is Senior Analytics Specialist TAM. He provides consultant service for AWS enterprise support customers to design and build modern data platform.


Samrat Deb is Software Development Engineer at Amazon EMR. In his spare time, he love exploring new places, different culture and food.


Prabhu Josephraj is a Senior Software Development Engineer working for Amazon EMR. He is focused on leading the team that builds solutions in Apache Hadoop and Apache Flink. In his spare time, Prabhu enjoys spending time with his family.

Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started

Post Syndicated from Akira Ajisaka original https://aws.amazon.com/blogs/big-data/part-1-getting-started-introducing-native-support-for-apache-hudi-delta-lake-and-apache-iceberg-on-aws-glue-for-apache-spark/

AWS Glue is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. AWS Glue provides an extensible architecture that enables users with different data processing use cases.

A common use case is building data lakes on Amazon Simple Storage Service (Amazon S3) using AWS Glue extract, transform, and load (ETL) jobs. Data lakes free you from proprietary data formats defined by the business intelligence (BI) tools and limited capacity of proprietary storage. In addition, data lakes help you break down data silos to maximize end-to-end data insights. As data lakes have grown in size and matured in usage, a significant amount of effort can be spent keeping the data up to date by ensuring files are updated in a transactionally consistent manner.

AWS Glue customers can now use the following open-source data lake storage frameworks: Apache Hudi, Linux Foundation Delta Lake, and Apache Iceberg. These data lake frameworks help you store data and interface data with your applications and frameworks. Although popular data file formats such as Apache Parquet, CSV, and JSON can store big data, data lake frameworks bundle distributed big data files into tabular structures that are otherwise hard to manage. This makes data lake table frameworks the building constructs of databases on data lakes.

We announced general availability for native support for Apache Hudi, Linux Foundation Delta Lake, and Apache Iceberg on AWS Glue for Spark. This feature removes the need to install a separate connector or associated dependencies, manage versions, and simplifies the configuration steps required to use these frameworks in AWS Glue for Apache Spark. With these open-source data lake frameworks, you can simplify incremental data processing in data lakes built on Amazon S3 by using ACID (atomicity, consistency, isolation, durability) transactions, upserts, and deletes.

This post demonstrates how AWS Glue for Apache Spark works with Hudi, Delta, and Iceberg dataset tables, and describes typical use cases on an AWS Glue Studio notebook.

Enable Hudi, Delta, Iceberg in Glue for Apache Spark

You can use Hudi, Delta, or Iceberg by specifying a new job parameter --datalake-formats. For example, if you want to use Hudi, you need to specify the key as --datalake-formats and the value as hudi. If the option is set, AWS Glue automatically adds the required JAR files into the runtime Java classpath, and that’s all you need. You don’t need to build and configure the required libraries or install a separate connector. You can use the following library versions with this option.

AWS Glue version Hudi Delta Lake Iceberg
AWS Glue 3.0 0.10.1 1.0.0 0.13.1
AWS Glue 4.0 0.12.1 2.1.0 1.0.0

If you want to use other versions of the preceding libraries, you can choose either of the following options:

If you choose either of the preceding options, you need to make sure the --datalake-formats job parameter is unspecified. For more information, see Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook.

Prerequisites

To continue this tutorial, you need to create the following AWS resources in advance:

Process Hudi, Delta, and Iceberg datasets on an AWS Glue Studio notebook

AWS Glue Studio notebooks provide serverless notebooks with minimal setup. It makes data engineers and developers quickly and interactively explore and process their datasets. You can start using Hudi, Delta, or Iceberg in an AWS Glue Studio notebook by specifying the parameter via %%configure magic and setting the AWS Glue version to 3.0 as follows:

# Use Glue version 3.0
%glue_version 3.0

# Configure '--datalake-formats' Job parameter
%%configure
{
  "--datalake-formats": "your_comma_separated_formats"
}

For more information, refer to the example notebooks available in the GitHub repository:

For this post, we use an Iceberg DataFrame as an example.

The following sections explain how to use an AWS Glue Studio notebook to create an Iceberg table and append records to the table.

Launch a Jupyter notebook to process Iceberg tables

Complete the following steps to launch an AWS Glue Studio notebook:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation plane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook.
  2. Upload native_iceberg_dataframe.ipynb through Choose file under File upload.

  1. Choose Create.
  2. For Job name, enter native_iceberg_dataframe.
  3. For IAM Role, choose your IAM role.
  4. Choose Start notebook job.

Prepare and configure SparkSession with Iceberg configuration

Complete the following steps to configure SparkSession to process Iceberg tables:

  1. Run the following cell.

You can see --datalake-formats iceberg is set by the %%configure Jupyter magic command. For more information about Jupyter magics, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

  1. Provide your S3 bucket name and bucket prefix for your Iceberg table location in the following cell, and run it.

  1. Run the following cells to initialize SparkSession.

  1. Optionally, if you previously ran the notebook, you need to run the following cell to clean up existing resources.

Now you’re ready to create Iceberg tables using the notebook.

Create an Iceberg table

Complete the following steps to create an Iceberg table using the notebook:

  1. Run the following cell to create a DataFrame (df_products) to write.

If successful, you can see the following table.

  1. Run the following cell to create an Iceberg table using the DataFrame.

  1. Now you can read data from the Iceberg table by running the following cell.

Append records to the Iceberg table

Complete the following steps to append records to the Iceberg table:

  1. Run the following cell to create a DataFrame (df_products_appends) to append.

  1. Run the following cell to append the records to the table.

  1. Run the following cell to confirm that the preceding records are successfully appended to the table.

Clean up

To avoid incurring ongoing charges, clean up your resources:

  1. Run step 4 in the Prepare and configure SparkSession with Iceberg configuration section in this post to delete the table and underlying S3 objects.
  2. On the AWS Glue console, choose Jobs in the navigation plane.
  3. Select your job and on the Actions menu, choose Delete job(s).
  4. Choose Delete to confirm.

Considerations

With this capability, you have three different options to access Hudi, Delta, and Iceberg tables:

  • Spark DataFrames, for example spark.read.format("hudi").load("s3://path_to_data")
  • SparkSQL, for example SELECT * FROM table
  • GlueContext, for example create_data_frame.from_catalog, write_data_frame.from_catalog, getDataFrame, and writeDataFrame

Learn more in Using the Hudi framework in AWS Glue, Using the Delta Lake framework in AWS Glue, and Using the Iceberg framework in AWS Glue.

Delta Lake native integration works with the catalog tables created from native Delta Lake tables by AWS Glue crawlers. This integration does not depend on manifest files. For more information, refer to Introducing native Delta Lake table support with AWS Glue crawlers.

Conclusion

This post demonstrated how to process Apache Hudi, Delta Lake, Apache Iceberg datasets using AWS Glue for Apache Spark. You can integrate your data using those data lake formats easily without struggling with library dependency management.

In subsequent posts in this series, we’ll show you how you can use AWS Glue Studio to visually author your ETL jobs with simpler configuration and setup for these data lake formats, and how to use AWS Glue workflows to orchestrate data pipelines and automate ingestion into your data lakes on Amazon S3 with AWS Glue jobs. Stay tuned!

If you have comments or feedback, please leave them in the comments.


About the authors

Akira Ajisaka is a Senior Software Development Engineer on the AWS Glue team. He likes open-source software and distributed systems. In his spare time, he enjoys playing both arcade and console games.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His teams work on building and innovating in distributed compute systems and frameworks, namely on Apache Spark.

How SikSin improved customer engagement with AWS Data Lab and Amazon Personalize

Post Syndicated from Byungjun Choi original https://aws.amazon.com/blogs/big-data/how-siksin-improved-customer-engagement-with-aws-data-lab-and-amazon-personalize/

This post is co-written with Byungjun Choi and Sangha Yang from SikSin.

SikSin is a technology platform connecting customers with restaurant partners serving their multiple needs. Customers use the SikSin platform to search and discover restaurants, read and write reviews, and view photos. From the restaurateurs’ perspective, SikSin enables restaurant partners to engage and acquire customers in order to grow their business. SikSin has a partnership with 850 corporate companies and more than 50,000 restaurants. They issue restaurant e-vouchers to more than 220,000 members, including individuals as well as corporate members. The SikSin platform receives more than 3 million users in a month. SikSin was listed in the top 100 of the Financial Times’s Asia-Pacific region’s high-growth companies in 2022.

SikSin was looking to deliver improved customer experiences and increase customer engagement. SikSin confronted two business challenges:

  • Customer engagement – SikSin maintains data on more than 750,000 restaurants and has more than 4,000 restaurant articles (and growing). SikSin was looking for a personalized and customized approach to provide restaurant recommendations for their customers and get them engaged with the content, thereby providing a personalized customer experience.
  • Data analysis activities – The SikSin Food Service team experienced difficulties in regards to report generation due to scattered data across multiple systems. The team previously had to submit a request to the IT team and then wait for answers that might be outdated. For the IT team, they needed to manually pull data out of files, databases, and applications, and then combine them upon every request, which is a time-consuming activity. The SikSin Food Service team wanted to view web analytics log data by multiple dimensions, such as customer profiles and places. Examples include page view, conversion rate, and channels.

To overcome these two challenges, SikSin participated in the AWS Data Lab program to assist them in building a prototype solution. The AWS Data Lab offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Build Lab is a 2–5-day intensive build with a technical customer team.

In this post, we share how SikSin built the basis for accelerating their data project with the help of the Data Lab and Amazon Personalize.

Use cases

The Data Lab team and SikSin team had three consecutive meetings to discuss business and technical requirements, and decided to work on two uses cases to resolve their two business challenges:

  • Build personalized recommendations – SikSin wanted to deploy a machine learning (ML) model to produce personalized content on the landing page of the platform, particularly restaurants and restaurant articles. The success criteria was to increase the number of page views per session and membership subscription, reduce their bounce rate, and ultimately engage more visitors and members in SikSin’s contents.
  • Establish self-service analytics – SikSin’s business users wanted to reduce time to insight by making data more accessible while removing the reliance on the IT team by giving business users the ability to query data. The key was to consolidate web logs from BigQuery and operational business data from Amazon Relational Data Service (Amazon RDS) into a single place and analyze data whenever they need.

Solution overview

The following architecture depicts what the SikSin team built in the 4-day Build Lab. There are two parts in the solution to address SikSin’s business and technical requirements. The first part (1–8) is for building personalized recommendations, and the second part (A–D) is for establishing self-service analytics.

SikSin Solution Architecture

SikSin deployed an ML model to produce personalized content recommendations by using the following AWS services:

  1. AWS Database Migration Service (AWS DMS) helps migrate databases to AWS quickly and securely with minimal downtime. The SikSin team used AWS DMS to perform full load to bring data from the database tables into Amazon Simple Storage Service (Amazon S3) as a target. Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance. An AWS Glue crawler populates the AWS Glue Data Catalog with the data schema definitions (in a landing folder).
  2. An AWS Lambda function checks if any previous files still exist in the landing folder and archives the files into a backup folder, if any.
  3. AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, ML, and application development. The SikSin team created AWS Glue Spark extract, transform, and load (ETL) jobs to prepare input datasets for ML models. These datasets are used to train ML models in bulk mode. There are a total of five datasets for training and two datasets for batch inference jobs.
  4. Amazon Personalize allows developers to quickly build and deploy curated recommendations and intelligent user segmentation at scale using ML. Because Amazon Personalize can be tailored to your individual needs, you can deliver the right customer experience at the right time and in the right place. Also, users will select existing ML models (also known as recipes), train models, and run batch inference to make recommendations.
  5. An Amazon Personalize job predicts for each line of input data (restaurants and restaurant articles) and produces ML-generated recommendations in the designated S3 output folder. The recommendation records are surfaced using interaction data, product data, and predictive models. An AWS Glue crawler populates the AWS Glue Data Catalog with the data schema definitions (in an output folder).
  6. The SikSin team applied business logics and filters in an AWS Glue job to prepare the final datasets for recommendations.
  7. AWS Step Functions enables you to build scalable, distributed applications using state machines. The SikSin team used AWS Step Functions Workflow Studio to visually create, run, and debug workflow runs. This workflow is triggered based on a schedule. The process includes data ingestion, cleansing, processing, and all steps defined in Amazon Personalize. This also involves managing run dependencies, scheduling, error-catching, and concurrency in accordance with the logical flow of the pipeline.
  8. Amazon Simple Notification Service (Amazon SNS) sends notifications. The SikSin team used Amazon SNS to send a notification via email and Google Hangouts with a Lambda function as a target.

To establish a self-service analytics environment to enable business users to perform data analysis, SikSin used the following services:

  1. The Google BigQuery Connector for AWS Glue simplifies the process of connecting AWS Glue jobs to extract data from BigQuery. The SikSin team used the connector to extract web analytics logs from BigQuery and load them to an S3 bucket.
  2. AWS Glue DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to clean and normalize data to prepare it for analytics and ML. You can choose from over 250 pre-built transformations to automate data preparation tasks, all without the need to write any code. The SikSin Food Service team used it to visually inspect large datasets and shape the data for their data analysis activities. An S3 bucket (in the intermediate folder) contains business operational data such as customers, places, articles, and products, and reference data loaded from AWS DMS and web analytics logs and data by AWS Glue jobs.
  3. An AWS Glue Python shell runs a job to cleanse and join data, and apply business rules to prepare the data for queries. The SikSin team used AWS SDK Pandas, an AWS Professional Service open-source Python initiative, which extends the power of the Pandas library to AWS, connecting DataFrames and AWS data related services. The output files are stored in an Apache Parquet format in a single folder. An AWS Glue crawler populates the data schema definitions (in an output folder) into the AWS Glue Data Catalog.
  4. The SikSin Food Service team used Amazon Athena and Amazon Quicksight to query and visualize the data analysis. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. QuickSight is an ML-powered business intelligence service built for the cloud.

Business outcomes

The SikSin Food Service team is now able to access the available data for performing data analysis and manipulation operations efficiently, as well as for getting insights on their own. This immediately allows the team as well as other lines of business to understand how customers are interacting with SikSin’s contents and services on the platform and make decisions sooner. For example, with the data output, the Food Service team was able to provide insights and data points for their external stakeholder and customer to initiate a new business idea. Moreover, the team shared, “We anticipate the recommendations and personalized content will increase conversion rates and customer engagement.”

The AWS Data Lab enabled SikSin to review and assess thoroughly what data is actually usable and available. With SikSin’s objective to successfully build a data pipeline for data analytics purposes, the SikSin team came to realize the importance of data cleansing, categorization, and standardization. “Only fruitful analysis and recommendation are possible when data is intact and properly cleansed,” said Byungjun Choi (the Head of SikSin’s Food Service Team). After completing the Data Lab, SikSin completed and set up an internal process that can streamline the data cleansing pipeline.

SikSin was stuck in the research phase of looking for a solution to solve their personalization challenges. The AWS Data Lab enabled the SikSin IT Team to get hands-on with the technology and build a minimum viable product (MVP) to explore how Amazon Personalize would work in their environment with their data. They achieved this via the Data Lab by adopting AWS DMS, AWS Glue, Amazon Personalize, and Step Functions. “Though it is still the early stage of building a prototype, I am very confident with the right enablement provided from AWS that an effective recommendation system can be adopted on production level very soon,” commented Sangha Yang (the Head of SikSin IT Team).

Conclusion

As a result of the 4-day Build Lab, the SikSin team left with a working prototype that is custom fit to their needs, gaining a clear path forward for enabling end-users to gain valuable insights into its data. The Data Lab allowed the SikSin team to accelerate the architectural design and prototype build of this solution by months. Based on the lessons and learnings obtained from Data Lab, SikSin is planning to launch a Global News Content Platform equipped with a recommendation feature in FY23.

As demonstrated by SikSin’s achievements, Amazon Personalize allows developers to quickly build and deploy curated recommendations and intelligent user segmentation at scale using ML. Because Amazon Personalize can be tailored to your individual needs, you can deliver the right customer experience at the right time and in the right place. Whether you want to optimize recommendations, target customers more accurately, maximize your data’s value, or promote items using business rules.

To accelerate your digital transformation with ML, the Data Lab program is available to support you by providing prescriptive architectural guidance on a particular use case, sharing best practices, and removing technical roadblocks. You’ll leave the engagement with an architecture or working prototype that is custom fit to your needs, a path to production, and deeper knowledge of AWS services.

Please contact your AWS Account Manager or Solutions Architect to get started. If you don’t have an AWS Account Manager, please contact Sales.


About the Authors

bdb-2857-BJByungjun Choi is the Head of SikSin Food Service at SikSin.

bdb-2857-SHSangha Yang is the Head of IT team at SinSin.

bdb-2857-youngguYounggu Yun is a Senior Data Lab Architect at AWS. He works with customers around the APAC region to help them achieve business goals and solve technical problems by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

Junwoo Lee is an Account Manager at AWS. He provides technical and business support to help customer resolve their problems and enrich customer journey by introducing local and global programs for his customers.

bdb-2857-jinwooJinwoo Park is a Senior Solutions Architect at AWS. He provides technical support for AWS customers to succeed with their cloud journey. He helps customers build more secure, efficient, and cost-optimized architectures and solutions, and delivers best practices and workshops.

Text analytics on AWS: implementing a data lake architecture with OpenSearch

Post Syndicated from Francisco Losada original https://aws.amazon.com/blogs/architecture/text-analytics-on-aws-implementing-a-data-lake-architecture-with-opensearch/

Text data is a common type of unstructured data found in analytics. It is often stored without a predefined format and can be hard to obtain and process.

For example, web pages contain text data that data analysts collect through web scraping and pre-process using lowercasing, stemming, and lemmatization. After pre-processing, the cleaned text is analyzed by data scientists and analysts to extract relevant insights.

This blog post covers how to effectively handle text data using a data lake architecture on Amazon Web Services (AWS). We explain how data teams can independently extract insights from text documents using OpenSearch as the central search and analytics service. We also discuss how to index and update text data in OpenSearch and evolve the architecture towards automation.

Architecture overview

This architecture outlines the use of AWS services to create an end-to-end text analytics solution, starting from the data collection and ingestion up to the data consumption in OpenSearch (Figure 1).

Data lake architecture with OpenSearch

Figure 1. Data lake architecture with OpenSearch

  1. Collect data from various sources, such as SaaS applications, edge devices, logs, streaming media, and social networks.
  2. Use tools like AWS Database Migration Service (AWS DMS), AWS DataSync, Amazon Kinesis, Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS IoT Core, and Amazon AppFlow to ingest the data into the AWS data lake, depending on the data source type.
  3. Store the ingested data in the raw zone of the Amazon Simple Storage Service (Amazon S3) data lake—a temporary area where data is kept in its original form.
  4. Validate, clean, normalize, transform, and enrich the data through a series of pre-processing steps using AWS Glue or Amazon EMR.
  5. Place the data that is ready to be indexed in the indexing zone.
  6. Use AWS Lambda to index the documents into OpenSearch and store them back in the data lake with a unique identifier.
  7. Use the clean zone as the source of truth for teams to consume the data and calculate additional metrics.
  8. Develop, train, and generate new metrics using machine learning (ML) models with Amazon SageMaker or artificial intelligence (AI) services like Amazon Comprehend.
  9. Store the new metrics in the enriching zone along with the identifier of the OpenSearch document.
  10. Use the identifier column from the initial indexing phase to identify the correct documents and update them in OpenSearch with the newly calculated metrics using AWS Lambda.
  11. Use OpenSearch to search through the documents and visualize them with metrics using OpenSearch Dashboards.

Considerations

Data lake orchestration among teams

This architecture allows data teams to work independently on text documents at different stages of their lifecycles. The data engineering team manages the raw and indexing zones, who also handle data ingestion and preprocessing for indexing in OpenSearch.

The cleaned data is stored in the clean zone, where data analysts and data scientists generate insights and calculate new metrics. These metrics are stored in the enrich zone and indexed as new fields in the OpenSearch documents by the data engineering team (Figure 2).

Data lake orchestration among teams

Figure 2. Data lake orchestration among teams

Let’s explore an example. Consider a company that periodically retrieves blog site comments and performs sentiment analysis using Amazon Comprehend. In this case:

  1. The comments are ingested into the raw zone of the data lake.
  2. The data engineering team processes the comments and stores them in the indexing zone.
  3. A Lambda function indexes the comments into OpenSearch, enriches the comments with the OpenSearch document ID, and saves it in the clean zone.
  4. The data science team consumes the comments and performs sentiment analysis using Amazon Comprehend.
  5. The sentiment analysis metrics are stored in the metrics zone of the data lake. A second Lambda function updates the comments in OpenSearch with the new metrics.

If the raw data does not require any preprocessing steps, the indexing and clean zones can be combined. You can explore this specific example, along with code implementation, in the AWS samples repository.

Schema evolution

As your data progresses through data lake stages, the schema changes and gets enriched accordingly. Continuing with our previous example, Figure 3 explains how the schema evolves.

Schema evolution through the data lake stages

Figure 3. Schema evolution through the data lake stages

  1. In the raw zone, there is a raw text field received directly from the ingestion phase. It’s best practice to keep a raw version of the data as a backup, or in case the processing steps need to be repeated later.
  2. In the indexing zone, the clean text field replaces the raw text field after being processed.
  3. In the clean zone, we add a new ID field that is generated during indexing and identifies the OpenSearch document of the text field.
  4. In the enrich zone, the ID field is required. Other fields with metric names are optional and represent new metrics calculated by other teams that will be added to OpenSearch.

Consumption layer with OpenSearch

In OpenSearch, data is organized into indices, which can be thought of as tables in a relational database. Each index consists of documents—similar to table rows—and multiple fields, similar to table columns. You can add documents to an index by indexing and updating them using various client APIs for popular programming languages.

Now, let’s explore how our architecture integrates with OpenSearch in the indexing and updating stage.

Indexing and updating documents using Python

The index document API operation allows you to index a document with a custom ID, or assigns one if none is provided. To speed up indexing, we can use the bulk index API to index multiple documents in one call.

We need to store the IDs back from the index operation to later identify the documents we’ll update with new metrics. Let’s explore two ways of doing this:

  • Use the requests library to call the REST Bulk Index API (preferred): the response returns the auto-generated IDs we need.
  • Use the Python Low-Level Client for OpenSearch: The IDs are not returned and need to be pre-assigned to later store them. We can use an atomic counter in Amazon DynamoDB to do so. This allows multiple Lambda functions to index documents in parallel without ID collisions.

As in Figure 4, the Lambda function:

  1. Increases the atomic counter by the number of documents that will index into OpenSearch.
  2. Gets the value of the counter back from the API call.
  3. Indexes the documents using the range that goes between [current counter value, current counter value – number of documents].
Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Figure 4. Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Data flow automation

As architectures evolve towards automation, the data flow between data lake stages becomes event-driven. Following our previous example, we can automate the processing steps of the data when moving from the raw to the indexing zone (Figure 5).

Event-driven automation for data flow

Figure 5. Event-driven automation for data flow

With Amazon EventBridge and AWS Step Functions, we can automatically trigger our pre-processing AWS Glue jobs so our data gets pre-processed without manual intervention.

The same approach can be applied to the other data lake stages to achieve a fully automated architecture. Explore this implementation for an automated language use case.

Conclusion

In this blog post, we covered designing an architecture to effectively handle text data using a data lake on AWS. We explained how different data teams can work independently to extract insights from text documents at different lifecycle stages using OpenSearch as the search and analytics service.

How BookMyShow saved 80% in costs by migrating to an AWS modern data architecture

Post Syndicated from Mahesh Vandi Chalil original https://aws.amazon.com/blogs/big-data/how-bookmyshow-saved-80-in-costs-by-migrating-to-an-aws-modern-data-architecture/

This is a guest post co-authored by Mahesh Vandi Chalil, Chief Technology Officer of BookMyShow.

BookMyShow (BMS), a leading entertainment company in India, provides an online ticketing platform for movies, plays, concerts, and sporting events. Selling up to 200 million tickets on an annual run rate basis (pre-COVID) to customers in India, Sri Lanka, Singapore, Indonesia, and the Middle East, BookMyShow also offers an online media streaming service and end-to-end management for virtual and on-ground entertainment experiences across all genres.

The pandemic gave BMS the opportunity to migrate and modernize our 15-year-old analytics solution to a modern data architecture on AWS. This architecture is modern, secure, governed, and cost-optimized architecture, with the ability to scale to petabytes. BMS migrated and modernized from on-premises and other cloud platforms to AWS in just four months. This project was run in parallel with our application migration project and achieved 90% cost savings in storage and 80% cost savings in analytics spend.

The BMS analytics platform caters to business needs for sales and marketing, finance, and business partners (e.g., cinemas and event owners), and provides application functionality for audience, personalization, pricing, and data science teams. The prior analytics solution had multiple copies of data, for a total of over 40 TB, with approximately 80 TB of data in other cloud storage. Data was stored on‑premises and in the cloud in various data stores. Growing organically, the teams had the freedom to choose their technology stack for individual projects, which led to the proliferation of various tools, technology, and practices. Individual teams for personalization, audience, data engineering, data science, and analytics used a variety of products for ingestion, data processing, and visualization.

This post discusses BMS’s migration and modernization journey, and how BMS, AWS, and AWS Partner Minfy Technologies team worked together to successfully complete the migration in four months and saving costs. The migration tenets using the AWS modern data architecture made the project a huge success.

Challenges in the prior analytics platform

  • Varied Technology: Multiple teams used various products, languages, and versions of software.
  • Larger Migration Project: Because the analytics modernization was a parallel project with application migration, planning was crucial in order to consider the changes in core applications and project timelines.
  • Resources: Experienced resource churn from the application migration project, and had very little documentation of current systems.
  • Data : Had multiple copies of data and no single source of truth; each data store provided a view for the business unit.
  • Ingestion Pipelines: Complex data pipelines moved data across various data stores at varied frequencies. We had multiple approaches in place to ingest data to Cloudera, via over 100 Kafka consumers from transaction systems and MQTT(Message Queue Telemetry Transport messaging protocol) for clickstreams, stored procedures, and Spark jobs. We had approximately 100 jobs for data ingestion across Spark, Alteryx, Beam, NiFi, and more.
  • Hadoop Clusters: Large dedicated hardware on which the Hadoop clusters were configured incurring fixed costs. On-premises Cloudera setup catered to most of the data engineering, audience, and personalization batch processing workloads. Teams had their implementation of HBase and Hive for our audience and personalization applications.
  • Data warehouse: The data engineering team used TiDB as their on-premises data warehouse. However, each consumer team had their own perspective of data needed for analysis. As this siloed architecture evolved, it resulted in expensive storage and operational costs to maintain these separate environments.
  • Analytics Database: The analytics team used data sourced from other transactional systems and denormalized data. The team had their own extract, transform, and load (ETL) pipeline, using Alteryx with a visualization tool.

Migration tenets followed which led to project success:

  • Prioritize by business functionality.
  • Apply best practices when building a modern data architecture from Day 1.
  • Move only required data, canonicalize the data, and store it in the most optimal format in the target. Remove data redundancy as much possible. Mark scope for optimization for the future when changes are intrusive.
  • Build the data architecture while keeping data formats, volumes, governance, and security in mind.
  • Simplify ELT and processing jobs by categorizing the jobs as rehosted, rewritten, and retired. Finalize canonical data format, transformation, enrichment, compression, and storage format as Parquet.
  • Rehost machine learning (ML) jobs that were critical for business.
  • Work backward to achieve our goals, and clear roadblocks and alter decisions to move forward.
  • Use serverless options as a first option and pay per use. Assess the cost and effort for rearchitecting to select the right approach. Execute a proof of concept to validate this for each component and service.

Strategies applied to succeed in this migration:

  • Team – We created a unified team with people from data engineering, analytics, and data science as part of the analytics migration project. Site reliability engineering (SRE) and application teams were involved when critical decisions were needed regarding data or timeline for alignment. The analytics, data engineering, and data science teams spent considerable time planning, understanding the code, and iteratively looking at the existing data sources, data pipelines, and processing jobs. AWS team with partner team from Minfy Technologies helped BMS arrive at a migration plan after a proof of concept for each of the components in data ingestion, data processing, data warehouse, ML, and analytics dashboards.
  • Workshops – The AWS team conducted a series of workshops and immersion days, and coached the BMS team on the technology and best practices to deploy the analytics services. The AWS team helped BMS explore the configuration and benefits of the migration approach for each scenario (data migration, data pipeline, data processing, visualization, and machine learning) via proof-of-concepts (POCs). The team captured the changes required in the existing code for migration. BMS team also got acquainted with the following AWS services:
  • Proof of concept – The BMS team, with help from the partner and AWS team, implemented multiple proofs of concept to validate the migration approach:
    • Performed batch processing of Spark jobs in Amazon EMR, in which we checked the runtime, required code changes, and cost.
    • Ran clickstream analysis jobs in Amazon EMR, testing the end-to-end pipeline. Team conducted proofs of concept on AWS IoT Core for MQTT protocol and streaming to Amazon S3.
    • Migrated ML models to Amazon SageMaker and orchestrated with Amazon MWAA.
    • Created sample QuickSight reports and dashboards, in which features and time to build were assessed.
    • Configured for key scenarios for Amazon Redshift, in which time for loading data, query performance, and cost were assessed.
  • Effort vs. cost analysis – Team performed the following assessments:
    • Compared the ingestion pipelines, the difference in data structure in each store, the basis of the current business need for the data source, the activity for preprocessing the data before migration, data migration to Amazon S3, and change data capture (CDC) from the migrated applications in AWS.
    • Assessed the effort to migrate approximately 200 jobs, determined which jobs were redundant or need improvement from a functional perspective, and completed a migration list for the target state. The modernization of the MQTT workflow code to serverless was time-consuming, decided to rehost on Amazon Elastic Compute Cloud (Amazon EC2) and modernization to Amazon Kinesis in to the next phase.
    • Reviewed over 400 reports and dashboards, prioritized development in phases, and reassessed business user needs.

AWS cloud services chosen for proposed architecture:

  • Data lake – We used Amazon S3 as the data lake to store the single truth of information for all raw and processed data, thereby reducing the copies of data storage and storage costs.
  • Ingestion – Because we had multiple sources of truth in the current architecture, we arrived at a common structure before migration to Amazon S3, and existing pipelines were modified to do preprocessing. These one-time preprocessing jobs were run in Cloudera, because the source data was on-premises, and on Amazon EMR for data in the cloud. We designed new data pipelines for ingestion from transactional systems on the AWS cloud using AWS Glue ETL.
  • Processing – Processing jobs were segregated based on runtime into two categories: batch and near-real time. Batch processes were further divided into transient Amazon EMR clusters with varying runtimes and Hadoop application requirements like HBase. Near-real-time jobs were provisioned in an Amazon EMR permanent cluster for clickstream analytics, and a data pipeline from transactional systems. We adopted a serverless approach using AWS Glue ETL for new data pipelines from transactional systems on the AWS cloud.
  • Data warehouse – We chose Amazon Redshift as our data warehouse, and planned on how the data would be distributed based on query patterns.
  • Visualization – We built the reports in Amazon QuickSight in phases and prioritized them based on business demand. We discussed with business users their current needs and identified the immediate reports required. We defined the phases of report and dashboard creation and built the reports in Amazon QuickSight. We plan to use embedded reports for external users in the future.
  • Machine learning – Custom ML models were deployed on Amazon SageMaker. Existing Airflow DAGs were migrated to Amazon MWAA.
  • Governance, security, and compliance – Governance with Amazon Lake Formation was adopted from Day 1. We configured the AWS Glue Data Catalog to reference data used as sources and targets. We had to comply to Payment Card Industry (PCI) guidelines because payment information was in the data lake, so we ensured the necessary security policies.

Solution overview

BMS modern data architecture

The following diagram illustrates our modern data architecture.

The architecture includes the following components:

  1. Source systems – These include the following:
    • Data from transactional systems stored in MariaDB (booking and transactions).
    • User interaction clickstream data via Kafka consumers to DataOps MariaDB.
    • Members and seat allocation information from MongoDB.
    • SQL Server for specific offers and payment information.
  2. Data pipeline – Spark jobs on an Amazon EMR permanent cluster process the clickstream data from Kafka clusters.
  3. Data lake – Data from source systems was stored in their respective Amazon S3 buckets, with prefixes for optimized data querying. For Amazon S3, we followed a hierarchy to store raw, summarized, and team or service-related data in different parent folders as per the source and type of data. Lifecycle polices were added to logs and temp folders of different services as per teams’ requirements.
  4. Data processing – Transient Amazon EMR clusters are used for processing data into a curated format for the audience, personalization, and analytics teams. Small file merger jobs merge the clickstream data to a larger file size, which saved costs for one-time queries.
  5. Governance – AWS Lake Formation enables the usage of AWS Glue crawlers to capture the schema of data stored in the data lake and version changes in the schema. The Data Catalog and security policy in AWS Lake Formation enable access to data for roles and users in Amazon Redshift, Amazon Athena, Amazon QuickSight, and data science jobs. AWS Glue ETL jobs load the processed data to Amazon Redshift at scheduled intervals.
  6. Queries – The analytics team used Amazon Athena to perform one-time queries raised from business teams on the data lake. Because report development is in phases, Amazon Athena was used for exporting data.
  7. Data warehouse – Amazon Redshift was used as the data warehouse, where the reports for the sales teams, management, and third parties (i.e., theaters and events) are processed and stored for quick retrieval. Views to analyze the total sales, movie sale trends, member behavior, and payment modes are configured here. We use materialized views for denormalized tables, different schemas for metadata, and transactional and behavior data.
  8. Reports – We used Amazon QuickSight reports for various business, marketing, and product use cases.
  9. Machine learning – Some of the models deployed on Amazon SageMaker are as follows:
    • Content popularity – Decides the recommended content for users.
    • Live event popularity – Calculates the popularity of live entertainment events in different regions.
    • Trending searches – Identifies trending searches across regions.

Walkthrough

Migration execution steps

We standardized tools, services, and processes for data engineering, analytics, and data science:

  • Data lake
    • Identified the source data to be migrated from Archival DB, BigQuery, TiDB, and the analytics database.
    • Built a canonical data model that catered to multiple business teams and reduced the copies of data, and therefore storage and operational costs. Modified existing jobs to facilitate migration to a canonical format.
    • Identified the source systems, capacity required, anticipated growth, owners, and access requirements.
    • Ran the bulk data migration to Amazon S3 from various sources.
  • Ingestion
    • Transaction systems – Retained the existing Kafka queues and consumers.
    • Clickstream data – Successfully conducted a proof of concept to use AWS IoT Core for MQTT protocol. But because we needed to make changes in the application to publish to AWS IoT Core, we decided to implement it as part of mobile application modernization at a later time. We decided to rehost the MQTT server on Amazon EC2.
  • Processing
  • Listed the data pipelines relevant to business and migrated them with minimal modification.
  • Categorized workloads into critical jobs, redundant jobs, or jobs that can be optimized:
    • Spark jobs were migrated to Amazon EMR.
    • HBase jobs were migrated to Amazon EMR with HBase.
    • Metadata stored in Hive-based jobs were modified to use the AWS Glue Data Catalog.
    • NiFi jobs were simplified and rewritten in Spark run in Amazon EMR.
  • Amazon EMR clusters were configured one persistent cluster for streaming the clickstream and personalization workloads. We used multiple transient clusters for running all other Spark ETL or processing jobs. We used Spot Instances for task nodes to save costs. We optimized data storage with specific jobs to merge small files and compressed file format conversions.
  • AWS Glue crawlers identified new data in Amazon S3. AWS Glue ETL jobs transformed and uploaded processed data to the Amazon Redshift data warehouse.
  • Datawarehouse
    • Defined the data warehouse schema by categorizing the critical reports required by the business, keeping in mind the workload and reports required in future.
    • Defined the staging area for incremental data loaded into Amazon Redshift, materialized views, and tuning the queries based on usage. The transaction and primary metadata are stored in Amazon Redshift to cater to all data analysis and reporting requirements. We created materialized views and denormalized tables in Amazon Redshift to use as data sources for Amazon QuickSight dashboards and segmentation jobs, respectively.
    • Optimally used the Amazon Redshift cluster by loading last two years data in Amazon Redshift, and used Amazon Redshift Spectrum to query historical data through external tables. This helped balance the usage and cost of the Amazon Redshift cluster.
  • Visualization
    • Amazon QuickSight dashboards were created for the sales and marketing team in Phase 1:
      • Sales summary report – An executive summary dashboard to get an overview of sales across the country by region, city, movie, theatre, genre, and more.
      • Live entertainment – A dedicated report for live entertainment vertical events.
      • Coupons – A report for coupons purchased and redeemed.
      • BookASmile – A dashboard to analyze the data for BookASmile, a charity initiative.
  • Machine learning
    • Listed the ML workloads to be migrated based on current business needs.
    • Priority ML processing jobs were deployed on Amazon EMR. Models were modified to use Amazon S3 as source and target, and new APIs were exposed to use the functionality. ML models were deployed on Amazon SageMaker for movies, live event clickstream analysis, and personalization.
    • Existing artifacts in Airflow orchestration were migrated to Amazon MWAA.
  • Security
    • AWS Lake Formation was the foundation of the data lake, with the AWS Glue Data Catalog as the foundation for the central catalog for the data stored in Amazon S3. This provided access to the data by various functionalities, including the audience, personalization, analytics, and data science teams.
    • Personally identifiable information (PII) and payment data was stored in the data lake and data warehouse, so we had to comply to PCI guidelines. Encryption of data at rest and in transit was considered and configured in each service level (Amazon S3, AWS Glue Data Catalog, Amazon EMR, AWS Glue, Amazon Redshift, and QuickSight). Clear roles, responsibilities, and access permissions for different user groups and privileges were listed and configured in AWS Identity and Access Management (IAM) and individual services.
    • Existing single sign-on (SSO) integration with Microsoft Active Directory was used for Amazon QuickSight user access.
  • Automation
    • We used AWS CloudFormation for the creation and modification of all the core and analytics services.
    • AWS Step Functions was used to orchestrate Spark jobs on Amazon EMR.
    • Scheduled jobs were configured in AWS Glue for uploading data in Amazon Redshift based on business needs.
    • Monitoring of the analytics services was done using Amazon CloudWatch metrics, and right-sizing of instances and configuration was achieved. Spark job performance on Amazon EMR was analyzed using the native Spark logs and Spark user interface (UI).
    • Lifecycle policies were applied to the data lake to optimize the data storage costs over time.

Benefits of a modern data architecture

A modern data architecture offered us the following benefits:

  • Scalability – We moved from a fixed infrastructure to the minimal infrastructure required, with configuration to scale on demand. Services like Amazon EMR and Amazon Redshift enable us to do this with just a few clicks.
  • Agility – We use purpose-built managed services instead of reinventing the wheel. Automation and monitoring were key considerations, which enable us to make changes quickly.
  • Serverless – Adoption of serverless services like Amazon S3, AWS Glue, Amazon Athena, AWS Step Functions, and AWS Lambda support us when our business has sudden spikes with new movies or events launched.
  • Cost savings – Our storage size was reduced by 90%. Our overall spend on analytics and ML was reduced by 80%.

Conclusion

In this post, we showed you how a modern data architecture on AWS helped BMS to easily share data across organizational boundaries. This allowed BMS to make decisions with speed and agility at scale; ensure compliance via unified data access, security, and governance; and to scale systems at a low cost without compromising performance. Working with the AWS and Minfy Technologies teams helped BMS choose the correct technology services and complete the migration in four months. BMS achieved the scalability and cost-optimization goals with this updated architecture, which has set the stage for innovation using graph databases and enhanced our ML projects to improve customer experience.


About the Authors

Mahesh Vandi Chalil is Chief Technology Officer at BookMyShow, India’s leading entertainment destination. Mahesh has over two decades of global experience, passionate about building scalable products that delight customers while keeping innovation as the top goal motivating his team to constantly aspire for these. Mahesh invests his energies in creating and nurturing the next generation of technology leaders and entrepreneurs, both within the organization and outside of it. A proud husband and father of two daughters and plays cricket during his leisure time.

Priya Jathar is a Solutions Architect working in Digital Native Business segment at AWS. She has more two decades of IT experience, with expertise in Application Development, Database, and Analytics. She is a builder who enjoys innovating with new technologies to achieve business goals. Currently helping customers Migrate, Modernise, and Innovate in Cloud. In her free time she likes to paint, and hone her gardening and cooking skills.

Vatsal Shah is a Senior Solutions Architect at AWS based out of Mumbai, India. He has more than nine years of industry experience, including leadership roles in product engineering, SRE, and cloud architecture. He currently focuses on enabling large startups to streamline their cloud operations and help them scale on the cloud. He also specializes in AI and Machine Learning use cases.

How Novo Nordisk built a modern data architecture on AWS

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-a-modern-data-architecture-on-aws/

Novo Nordisk is a leading global pharmaceutical company, responsible for producing life-saving medicines that reach more than 34 million patients each day. They do this following their triple bottom line—that they must strive to be environmentally sustainable, socially sustainable, and financially sustainable. The combination of using AWS and data supports all these targets.

Data is pervasive throughout the entire value chain of Novo Nordisk. From foundational research, manufacturing lines, sales and marketing, clinical trials, pharmacovigilance, through patient-facing data-driven applications. Therefore, getting the foundation around how data is stored, safeguarded, and used in a way that provides the most value is one of the central drivers of improved business outcomes.

Together with AWS Professional Services, we’re building a data and analytics solution using a modern data architecture. The collaboration between Novo Nordisk and AWS Professional Services is a strategic and long-term close engagement, where developers from both organizations have worked together closely for years. The data and analytics environments are built around of the core tenets of the data mesh—decentralized domain ownership of data, data as a product, self-service data infrastructure, and federated computational governance. This enables the users of the environment to work with data in the way that drives the best business outcomes. We have combined this with elements from evolutionary architectures that will allow us to adapt different functionalities as AWS continuously develops new services and capabilities.

In this series of posts, you will learn how Novo Nordisk and AWS Professional Services built a data and analytics ecosystem to speed up innovation at petabyte scale:

  • In this first post, you will learn how the overall design has enabled the individual components to come together in a modular way. We dive deep into how we built a data management solution based on the data mesh architecture.
  • The second post discusses how we built a trust network between the systems that comprise the entire solution. We show how we use event-driven architectures, coupled with the use of attribute-based access controls, to ensure permission boundaries are respected at scale.
  • In the third post, we show how end-users can consume data from their tool of choice, without compromising data governance. This includes how to configure Okta, AWS Lake Formation, and Microsoft Power BI to enable SAML-based federated use of Amazon Athena for an enterprise business intelligence (BI) activity.

Pharma-compliant environment

As a pharmaceutical industry, GxP compliance is a mandate for Novo Nordisk. GxP is a general abbreviation for the “Good x Practice” quality guidelines and regulations defined by regulators such as European Medicines Agency, U.S. Food and Drug Administration, and others. These guidelines are designed to ensure that medicinal products are safe and effective for their intended use. In the context of a data environment, GxP compliance involves implementing integrity controls for data used to in decision making and processes and is used to guide how change management processes are implemented to continuously ensure compliance over time.

Because this data environment supports teams across the whole organization, each individual data owner must retain accountability on their data. Features were designed to provide data owners autonomy and transparency when managing their data, enabling them to take this responsibility. This includes the capability to handle personally identifiable information (PII) data and other sensitive workloads. To provide traceability on the environment, audit capabilities were added, which we describe more in this post.

Solution overview

The full solution is a sprawling landscape of independent services that work together to enable data and analytics with a decentralized data governance model at petabyte scale. Schematically, it can be represented as in the following figure.

Novo Nordisk Modern Data Architecture on AWS

The architecture is split into three independent layers: data management, virtualization, and consumption. The end-user sits in the consumption layer and works with their tool of choice. It’s meant to abstract as much of the AWS-native resources to application primitives. The consumption layer is integrated into the virtualization layer, which abstracts the access to data. The purpose of the virtualization layer is to translate between data consumption and data management solutions. The access to data is managed by what we refer to as data management solutions. We discuss one of our versatile data management solutions later in this post. Each layer in this architecture is independent of each other and instead only relies on well-defined interfaces.

Central to this architecture is that access is encapsulated in an AWS Identity and Access Management (IAM) role session. The data management layer focuses on providing the IAM role with the right permissions and governance, the virtualization layer provides access to the role, and the consumption layer abstracts the use of the roles in the tools of choice.

Technical architecture

Each of the three layers in the overall architecture has a distinct responsibility, but no singular implementation. Think of them as abstract classes. They can be implemented in concrete classes, and in our case they rely on foundational AWS services and capabilities. Let’s go through each of the three layers.

Data management layer

The data management layer is responsible for providing access to and governance of data. As illustrated in the following diagram, a minimal construct in the data management layer is the combination of an Amazon Simple Storage Service (Amazon S3) bucket and an IAM role that gives access to the S3 bucket. This construct can be expanded to include granular permission with Lake Formation, auditing with AWS CloudTrail, and security response capabilities from AWS Security Hub. The following diagram also shows that a single data management solution has no singular span. It can cross many AWS accounts and be comprised of any number of IAM role combinations.Data Mamangement Architecture

We have purposely not illustrated the trust policy of these roles in this figure, because those are a collaborative responsibility between the virtualization layer and the data management layer. We go into detail of how that works in the next post in this series. Data engineering professionals often interface directly with the data management layer, where they curate and prepare data for consumption.

Virtualization layer

The purpose of the virtualization layer is to keep track of who can do what. It doesn’t have any capabilities in itself, but translates the requirements from the data management ecosystems to the consumption layers and vice versa. It enables end-users on the consumption layer to access and manipulate data on one or more data management ecosystems, according to their permissions. This layer abstracts from end-users the technical details on data access, such as permission model, role assumptions, and storage location. It owns the interfaces to the other layers and enforces the logic of the abstraction. In the context of hexagonal architectures (see Developing evolutionary architecture with AWS Lambda), the interface layer plays the role of the domain logic, ports, and adapters. The other two layers are actors. The data management layer communicates the state of the layer to the virtualization layer and conversely receives information about the service landscape to trust. The virtualization layer architecture is shown in the following diagram.

Virtualization Layer Architecture

Consumption layer

The consumption layer is where the end-users of the data products are sitting. This can be data scientists, business intelligence analysts, or any third party that generates value from consuming the data. It’s important for this type of architecture that the consumption layer has a hook-based sign-in flow, where the authorization into the application can be modified at sign-in time. This is to translate the AWS-specific requirement into the target applications. After the session in the client-side application has successfully been started, it’s up to the application itself to instrument for data layer abstraction, because this will be application specific. And this is an additional important decoupling, where some responsibility is pushed to the decentralized units. Many modern software as a service (SaaS) applications support these built-in mechanisms, such as Databricks or Domino Data Lab, whereas more traditional client-side applications like RStudio Server have more limited native support for this. In the case where native support is missing, a translation down to the OS user session can be done to enable the abstraction. The consumption layer is shown schematically in the following diagram.

Consumption Layer Architecture

When using the consumption layer as intended, the users don’t know that the virtualization layer exists. The following diagram illustrates the data access patterns.

Data Access Patterns

Modularity

One of the main advantages of adopting the hexagonal architecture pattern, and delegating both the consuming layer and the data management layer to primary and secondary actors, means that they can be changed or replaced as new functionalities are released that require new solutions. This gives a hub-and-spoke type pattern, where many different types of producer/consumer type systems can be connected and work simultaneously in union. An example of this is that the current solution running in Novo Nordisk supports multiple, simultaneous data management solutions and are exposed in a homogenous way in the consuming layer. This includes both a data lake, the data mesh solution presented in this post, and several independent data management solutions. And these are exposed to multiple types of consuming applications, from custom managed, self-hosted applications, to SaaS offerings.

Data management ecosystem

To scale the usage of the data and increase the freedom, Novo Nordisk, jointly with AWS Professional Services, built a data management and governance environment, named Novo Nordisk Enterprise DataHub (NNEDH). NNEDH implements a decentralized distributed data architecture, and data management capabilities such as an enterprise business data catalog and data sharing workflow. NNEDH is an example of a data management ecosystem in the conceptual framework introduced earlier.

Decentralized architecture: From a centralized data lake to a distributed architecture

Novo Nordisk’s centralized data lake consists of 2.3 PB of data from more than 30 business data domains worldwide serving over 2000+ internal users throughout the value chain. It has been running successfully for several years. It is one of the data management ecosystems currently supported.

Within the centralized data architecture, data from each data domain is copied, stored, and processed in one central location: a central data lake hosted in one data storage. This pattern has challenges at scale because it retains the data ownership with the central team. At scale, this model slows down the journey toward a data-driven organization, because ownership of the data isn’t sufficiently anchored with the professionals closest to the domain.

The monolithic data lake architecture is shown in the following diagram.Monolithic Data Lake Architecture

Within the decentralized distributed data architecture, the data from each domain is kept within the domain on its own data storage and compute account. In this case, the data is kept close to domain experts, because they’re the ones who know their own data best and are ultimately the owner of any data products built around their data. They often work closely with business analysts to build the data product and therefore know what good data means to consumers of their data products. In this case, the data responsibility is also decentralized, where each domain has its own data owner, putting the accountability onto the true owners of the data. Nevertheless, this model might not work at small scale, for example an organization with only one business unit and tens of users, because it would introduce more overhead on the IT team to manage the organization data. It better suits large organizations, or small and medium ones that would like to grow and scale.

The Novo Nordisk data mesh architecture is shown in the following diagram.

Novo Nordisk Data Mesh Architecture

Data domains and data assets

To enable the scalability of data domains across the organization, it’s mandatory to have a standard permission model and data access pattern. This standard must not be too restrictive in such a way that it may be a blocker for specific use cases, but it should be standardized in such a way to use the same interface between the data management and virtualization layers.

The data domains on NNEDH are implemented by a construct called an environment. An environment is composed of at least one AWS account and one AWS Region. It’s a workplace where data domain teams can work and collaborate to build data products. It links the NNEDH control plane to the AWS accounts where the data and compute of the domain reside. The data access permissions are also defined at the environment level, managed by the owner of the data domain. The environments have three main components: a data management and governance layer, data assets, and optional blueprints for data processing.

For data management and governance, the data domains rely on Lake Formation, AWS Glue, and CloudTrail. The deployment method and setup of these components is standardized across data domains. This way, the NNEDH control plane can provide connectivity and management to data domains in a standardized way.

The data assets of each domain residing in an environment are organized in a dataset, which is a collection of related data used for building a data product. It includes technical metadata such as data format, size, and creation time, and business metadata such as the producer, data classification, and business definition. A data product can use one or several datasets. It is implemented through managed S3 buckets and the AWS Glue Data Catalog.

Data processing can be implemented in different ways. NNEDH provides blueprints for data pipelines with predefined connectivity to data assets to speed up the delivery of data products. Data domain users have the freedom to use any other compute capability on their domain, for example using AWS services not predefined on the blueprints or accessing the datasets from other analytics tools implemented in the consumption layer, as mentioned earlier in this post.

Data domain personas and roles

On NNEDH, the permission levels on data domains are managed through predefined personas, for example data owner, data stewards, developers, and readers. Each persona is associated with an IAM role that has a predefined permission level. These permissions are based on the typical needs of users on these roles. Nevertheless, to give more flexibility to data domains, these permissions can be customized and extended as needed.

The permissions associated with each persona are related only to actions allowed on the AWS account of the data domain. For the accountability on data assets, the data access to the assets is managed by specific resource policies instead of IAM roles. Only the owner of each dataset, or data stewards delegated by the owner, can grant or revoke data access.

On the dataset level, a required persona is the data owner. Typically, they work closely with one or many data stewards as data products managers. The data steward is the data subject matter expert of the data product domain, responsible for interpreting collected data and metadata to derive deep business insights and build the product. The data steward bridges between business users and technical teams on each data domain.

Enterprise business data catalog

To enable freedom and make the organization data assets discoverable, a web-based portal data catalog is implemented. It indexes in a single repository metadata from datasets built on data domains, breaking data silos across the organization. The data catalog enables data search and discovery across different domains, as well as automation and governance on data sharing.

The business data catalog implements data governance processes within the organization. It ensures the data ownership—someone in the organization is responsible for the data origin, definition, business attributes, relationships, and dependencies.

The central construct of a business data catalog is a dataset. It’s the search unit within the business catalog, having both technical and business metadata. To collect technical metadata from structured data, it relies on AWS Glue crawlers to recognize and extract data structures from the most popular data formats, including CSV, JSON, Avro, and Apache Parquet. It provides information such as data type, creation date, and format. The metadata can be enriched by business users by adding a description of the business context, tags, and data classification.

The dataset definition and related metadata are stored in an Amazon Aurora Serverless database and Amazon OpenSearch Service, enabling you to run textual queries on the data catalog.

Data sharing

NNEDH implements a data sharing workflow, enabling peer-to-peer data sharing across AWS accounts using Lake Formation. The workflow is as follows:

  1. A data consumer requests access to the dataset.
  2. The data owner grants access by approving the access request. They can delegate the approval of access requests to the data steward.
  3. Upon the approval of an access request, a new permission is added to the specific dataset in Lake Formation of the producer account.

The data sharing workflow is shown schematically in the following figure.

Data Sharing Workflow

Security and audit

The data in the Novo Nordisk data mesh lies in AWS accounts owned by Novo Nordisk business accounts. The configuration and the states of the data mesh are stored in Amazon Relational Database Service (Amazon RDS). The Novo Nordisk security architecture is shown in the following figure.

Novo Nordisk Distributed Security and Audit Architecture

Access and edits to the data in NNEDH needs to be logged for audit purposes. We need to be able to tell who modified data, when the modification happened, and what modifications were applied. In addition, we need to be able to answer why the modification was allowed by that person at that time.

To meet these requirements, we use the following components:

  • CloudTrail to log API calls. We specifically enable CloudTrail data event logging for S3 buckets and objects. By activating the logging, we can trace back any modification to any files in the data lake to the person who made the modification. We enforce usage of source identity for IAM role sessions to ensure user traceability.
  • We use Amazon RDS to store the configuration of the data mesh. We log queries against the RDS database. Together with CloudTrail, this log allows us to answer the question of why a modification to a file in Amazon S3 at a specific time by a specific person is possible.
  • Amazon CloudWatch to log activities across the mesh.

In addition to those logging mechanisms, the S3 buckets are created using the following properties:

  • The bucket is encrypted using server-side encryption with AWS Key Management Service (AWS KMS) and customer managed keys
  • Amazon S3 versioning is activated by default

Access to the data in NNEDH is controlled at the group level instead of individual users. The group corresponds to the group defined in the Novo Nordisk directory group. To keep track of the person who modified the data in the data lakes, we use the source identity mechanism explained in the post How to relate IAM role activity to corporate identity.

Conclusion

In this post, we showed how Novo Nordisk built a modern data architecture to speed up the delivery of data-driven use cases. It includes a distributed data architecture, to scale the usage to petabyte scale for over 2,000 internal users throughout the value chain, as well as a distributed security and audit architecture handling data accountability and traceability on the environment to meet their compliance requirements.

The next post in this series describes the implementation of distributed data governance and control at scale of Novo Nordisk’s modern data architecture.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Anwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Alessandro FiorAlessandro Fior is a Sr. Data Architect at AWS Professional Services. With over 10 years of experience delivering data and analytics solutions, he is passionate about designing and building modern and scalable data platforms that accelerate companies to get value from their data.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Introducing native Delta Lake table support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-native-delta-lake-table-support-with-aws-glue-crawlers/

Delta Lake is an open-source project that helps implement modern data lake architectures commonly built on Amazon S3 or other cloud storages. With Delta Lake, you can achieve ACID transactions, time travel queries, CDC, and other common use cases on the cloud. Delta Lake is available with multiple AWS services, such as AWS Glue Spark jobs, Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum.

AWS Glue includes Delta crawler, a capability that makes discovering datasets simpler by scanning Delta Lake transaction logs in Amazon Simple Storage Service (Amazon S3), extracting their schema, creating manifest files in Amazon S3, and automatically populating the AWS Glue Data Catalog, which keeps the metadata current.  The newly created AWS Glue Data Catalog table has format SymlinkTextInputFormat. Delta crawler creates a manifest file, which is a text file containing the list of data files that query engines such as Presto, Trino, or Athena can use to query the table rather than finding the files with the directory listing. A previous blog post demonstrated how it works. Manifest files needed to be regenerated on a periodic basis to include newer transactions in the original Delta Lake tables which resulted in expensive I/O operations, longer processing times, and increased storage footprint.

With today’s launch, Glue crawler is adding support for creating AWS Glue Data Catalog tables for native Delta Lake tables and does not require generating manifest files. This improves customer experience because now you don’t have to regenerate manifest files whenever a new partition becomes available or a table’s metadata changes. With the native Delta Lake tables and automatic schema evolution with no additional manual intervention, this reduces the time to insight by making newly ingested data quickly available for analysis with your preferred analytics and machine learning (ML) tools.

Amazon Athena SQL engine version 3 started supporting Delta Lake native connector. AWS Glue for Apache Spark also started supporting Delta Lake native connector in Glue version 3.0 and later. Amazon EMR started supporting Delta Lake in EMR release version 6.9.0 and later. It means that you can query the Delta transaction log directly in Amazon Athena, AWS Glue for Apache Spark, and Amazon EMR. It makes the experience of working with native Delta Lake tables seamless across the platforms.

This post demonstrates how AWS Glue crawlers work with native Delta Lake tables and describes typical use cases to query native Delta Lake tables.

How AWS Glue crawler works with native Delta Lake tables

Now AWS Glue crawler has two different options:

  • Native table: Create a native Delta Lake table definition on AWS Glue Data Catalog.
  • Symlink table: Create a symlink-based manifest table definition on AWS Glue Data Catalog from a Delta Lake table, and generate its symlink files on Amazon S3.

Native table

Native Delta Lake tables are accessible from Amazon Athena (engine version 3), AWS Glue for Apache Spark (Glue version 3.0 and later), Amazon EMR (release version 6.9.0 and later), and other platforms that support Delta Lake tables. With the native Delta Lake tables, you have the capabilities such as ACID transactions, all while needing to maintain just a single source of truth.

Symlink table

Symlink tables are a consistent snapshot of a native Delta Lake table, represented using the SymlinkTextInputFormat using parquet files. The symlink tables are accessible from Amazon Athena and Amazon Redshift Spectrum.

Since the symlink tables are a snapshot of the original native Delta Lake tables, you need to maintain both the original native Delta Lake tables and the symlink tables. When the data or schema in an original Delta Lake table is updated, the symlink tables in the AWS Glue Data Catalog may become out of sync. It means that you can still query the symlink table and get a consistent result, but the result of the table is at the previous point in time.

Crawl native Delta Lake tables using AWS Glue crawler

In this section, let’s go through how to crawl native Delta Lake tables using AWS Glue crawler.

Prerequisite

Here’s the prerequisite for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue crawler if you do not have it.
  4. Run the following command to copy the sample Delta Lake table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/delta-lake-crawler/sample_delta_table/ s3://your_s3_bucket/data/sample_delta_table

Create a Delta Lake crawler

A Delta Lake crawler can be created through the AWS Glue console, AWS Glue SDK, or AWS CLI. Specify a DeltaTarget with the following configurations:

  • DeltaTables – A list of S3 DeltaPaths where the Delta Lake tables are located. (Note that each path must be the parent of a _delta_log folder. If the Delta transaction log is located at s3://bucket/sample_delta_table/_delta_log, then the path s3://bucket/sample_delta_table/ should be provided.
  • WriteManifest – A Boolean value indicating whether or not the crawler should write the manifest files for each DeltaPath. This parameter is only applicable for Delta Lake tables created via manifest files
  • CreateNativeDeltaTable – A Boolean value indicating whether the crawler should create a native Delta Lake table. If set to False, the crawler would create a symlink table instead. Note that both WriteManifest and CreateNativeDeltaTable options can’t be set to True.
  • ConnectionName – An optional connection name stored in the Data Catalog that the crawler should use to access Delta Lake tables backed by a VPC.

In this instruction, create the crawler through the console. Complete the following steps to create a Delta Lake crawler:

  1. Open the AWS Glue console.
  2. Choose Crawlers.
  3. Choose Create crawler.
  4. For Name, enter delta-lake-native-crawler, and choose Next.
  5. Under Data sources, choose Add a data source.
  6. For Data source, select Delta Lake.
  7. For Include delta lake table path(s), enter s3://your_s3_bucket/data/sample_delta_table/.
  8. For Create tables for querying, choose Create Native tables,
  9. Choose Add a Delta Lake data source.
  10. Choose Next.
  11. For Existing IAM role, choose your IAM role, then choose Next.
  12. For Target database, choose Add database, then Add database dialog appears. For Database name, enter delta_lake_native, then choose Create. Choose Next.
  13. Choose Create crawler.
  14. The Delta Lake crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.
  15. Select delta-lake-native-crawler, and choose Run.
  16. Wait for the crawler to complete.

After the crawler has run, you can see the Delta Lake table definition in the AWS Glue console:

You can also verify an AWS Glue table definition through the following AWS CLI command:

$ aws glue get-table --database delta_lake_native --name sample_delta_table
{
    "Table": {
        "Name": "sample_delta_table",
        "DatabaseName": "delta_lake_native",
        "Owner": "owner",
        "CreateTime": "2022-11-08T12:11:20+09:00",
        "UpdateTime": "2022-11-08T13:19:06+09:00",
        "LastAccessTime": "2022-11-08T13:19:06+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "product_id",
                    "Type": "string"
                },
                {
                    "Name": "product_name",
                    "Type": "string"
                },
                {
                    "Name": "price",
                    "Type": "bigint"
                },
                {
                    "Name": "currency",
                    "Type": "string"
                },
                {
                    "Name": "category",
                    "Type": "string"
                },
                {
                    "Name": "updated_at",
                    "Type": "double"
                }
            ],
            "Location": "s3://your_s3_bucket/data/sample_delta_table/",
            "AdditionalLocations": [],
            "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
            "Compressed": false,
            "NumberOfBuckets": -1,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "serialization.format": "1",
                    "path": "s3://your_s3_bucket/data/sample_delta_table/"
                }
            },
            "BucketColumns": [],
            "SortColumns": [],
            "Parameters": {
                "EXTERNAL": "true",
                "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
                "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
                "CrawlerSchemaSerializerVersion": "1.0",
                "CrawlerSchemaDeserializerVersion": "1.0",
                "spark.sql.partitionProvider": "catalog",
                "classification": "delta",
                "spark.sql.sources.schema.numParts": "1",
                "spark.sql.sources.provider": "delta",
                "delta.lastCommitTimestamp": "1653462383292",
                "delta.lastUpdateVersion": "6",
                "table_type": "delta"
            },
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [],
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "EXTERNAL": "true",
            "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
            "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
            "CrawlerSchemaSerializerVersion": "1.0",
            "CrawlerSchemaDeserializerVersion": "1.0",
            "spark.sql.partitionProvider": "catalog",
            "classification": "delta",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "delta",
            "delta.lastCommitTimestamp": "1653462383292",
            "delta.lastUpdateVersion": "6",
            "table_type": "delta"
        },
        "CreatedBy": "arn:aws:sts::012345678901:assumed-role/AWSGlueServiceRole/AWS-Crawler",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "012345678901",
        "IsRowFilteringEnabled": false,
        "VersionId": "1",
        "DatabaseId": "0bd458e335a2402c828108f267bc770c"
    }
}

After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Athena and AWS Glue Spark jobs are able to query the Delta Lake table.

Query Delta Lake tables using Amazon Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale. You can use Athena to query your S3 data lake for use cases such as data exploration for machine learning (ML) and AI, business intelligence (BI) reporting, and ad hoc querying.

There are now two ways to use Delta Lake tables in Athena:

  • For native table: Use Athena’s newly launched native support for Delta Lake tables. You can learn more in Querying Delta Lake tables. This method no longer requires regenerating manifest files after every transaction. Data updates are available for queries in Athena as soon as they are performed in the original Delta Lake tables, and you get up to 40 percent improvement in query performance over querying manifest files. Since Athena optimizes data scans in native Delta Lake queries using statistics in Delta Lake files, you get the advantage of reduced cost for Athena queries. This post focuses on this approach.
  • For symlink table: Use SymlinkTextInputFormat to query symlink tables through manifest files generated from Delta Lake tables. This was previously the only manner in which Delta Lake table querying was supported via Athena and is no longer recommended when you use only Athena to query the Delta Lake tables.

To use the native Delta Lake connector in Athena, you need to use Athena engine version 3. If you are using an older engine version, change the engine version.

Complete following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "delta_lake_native"."sample_delta_table" limit 10;

The following screenshot shows our output:

Query Delta Lake tables using AWS Glue for Apache Spark

AWS Glue for Apache Spark natively supports Delta Lake. AWS Glue version 3.0 (Apache Spark 3.1.1) supports Delta Lake 1.0.0, and AWS Glue version 4.0 (Apache Spark 3.3.0) supports Delta Lake 2.1.0. With this native support for Delta Lake, what you need for configuring Delta Lake is to provide a single job parameter --datalake-formats delta. There is no need to configure a separate connector for Delta Lake in AWS Marketplace. It reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark.

AWS Glue also provides a serverless notebook interface called AWS Glue Studio notebook to query and process data interactively. Complete the following steps to launch AWS Glue Studio notebook and query a Delta Lake table:

  1. On the AWS Glue console, choose Jobs in the navigation plane.
  2. Under Create job, select Jupyter Notebook.
  3. Choose Create a new notebook from scratch, and choose Create.
  4. For Job name, enter delta-sql.
  5. For IAM role,  choose your IAM role. If you don’t have your own role for the AWS Glue job, create it by following the steps documented in the AWS Glue Developer Guide.
  6. Choose Start notebook job.
  7. Copy and paste the following code to the first cell and run the cell.
    %glue_version 3.0
    %%configure
    {
      "--datalake-formats": "delta"
    }

  8. Run the existing cell containing the following code.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

  9. Copy and paste the following code to the third cell and run the cell.
    %%sql
    SELECT * FROM `delta_lake_native`.`sample_delta_table` limit 10

The following screenshot shows our output:

Clean up

Now for the final step, cleaning up the resources:

  • Delete your data under your S3 path: s3://your_s3_bucket/data/sample_delta_table/.
  • Delete the AWS Glue crawler delta-lake-native-crawler.
  • Delete the AWS Glue database delta_lake_native.
  • Delete the AWS Glue notebook job delta-sql.

Conclusion

This post demonstrated how to crawl native Delta Lake tables using an AWS Glue crawler and how to query the crawled tables from Athena and Glue Spark jobs. Start using AWS Glue crawlers for your own native Delta Lake tables.

If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems. In his free time, he enjoys cycling or playing basketball.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Getting started with AWS Glue Data Quality for ETL Pipelines

Post Syndicated from Deenbandhu Prasad original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/

Today, hundreds of thousands of customers use data lakes for analytics and machine learning. However, data engineers have to cleanse and prepare this data before it can be used. The underlying data has to be accurate and recent for customer to make confident business decisions. Otherwise, data consumers lose trust in the data and make suboptimal or incorrect decisions. It is a common task for data engineers to evaluate whether the data is accurate and recent or not. Today there are various data quality tools. However, common data quality tools usually require manual processes to monitor data quality.

AWS Glue Data Quality is a preview feature of AWS Glue that measures and monitors the data quality of Amazon Simple Storage Service (Amazon S3) data lakes and in AWS Glue extract, transform, and load (ETL) jobs. This is an open preview feature so it is already enabled in your account in the available Regions. You can easily define and measure the data quality checks in AWS Glue Studio console without writing codes. It simplifies your experience of managing data quality.

This post is Part 2 of a four-post series to explain how AWS Glue Data Quality works. Check out the previous post in this series:

Getting started with AWS Glue Data Quality

In this post, we show how to create an AWS Glue job that measures and monitors the data quality of a data pipeline. We also show how to take action based on the data quality results.

Solution overview

Let’s consider an example use case in which a data engineer needs to build a data pipeline to ingest the data from a raw zone to a curated zone in a data lake. As a data engineer, one of your key responsibilities—along with extracting, transforming, and loading data—is validating the quality of data. Identifying data quality issues upfront helps you prevent placing bad data in the curated zone and avoid arduous data corruption incidents.

In this post, you’ll learn how to easily set up built-in and custom data validation checks in your AWS Glue job to prevent bad data from corrupting the downstream high-quality data.

The dataset used for this post is synthetically generated; the following screenshot shows an example of the data.

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket (gluedataqualitystudio-*).
  • The following prefixes and objects in the S3 bucket:
    • datalake/raw/customer/customer.csv
    • datalake/curated/customer/
    • scripts/
    • sparkHistoryLogs/
    • temporary/
  • AWS Identity and Access Management (IAM) users, roles, and policies. The IAM role (GlueDataQualityStudio-*) has permission to read and write from the S3 bucket.
  • AWS Lambda functions and IAM policies required by those functions to create and delete this stack.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:

  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack and wait for the stack creation step to complete.

Implement the solution

To start configuring your solution, complete the following steps:

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with a blank canvas and choose Create.
  3. Choose the Job Details tab to configure the job.
  4. For Name, enter GlueDataQualityStudio.
  5. For IAM Role, choose the role starting with GlueDataQualityStudio-*.
  6. For Glue version, choose Glue 3.0.
  7. For Job bookmark, choose Disable. This allows you to run this job multiple times with the same input dataset.
  8. For Number of retries, enter 0.
  9. In the Advanced properties section, provide the S3 bucket created by the CloudFormation template (starting with gluedataqualitystudio-*).
  10. Choose Save.
  11. After the job is saved, choose the Visual tab and on the Source menu, choose Amazon S3.
  12. On the Data source properties – S3 tab, for S3 source type, select S3 location.
  13. Choose Browse S3 and navigate to prefix /datalake/raw/customer/ in the S3 bucket starting with gluedataqualitystudio-* .
  14. Choose Infer schema.
  15. On the Action menu, choose Evaluate Data Quality.
  16. Choose the Evaluate Data Quality node.

    On the Transform tab, you can now start building data quality rules. The first rule you create is to check if Customer_ID is unique and not null using the isPrimaryKey rule.
  17. On the Rule types tab of the DQDL rule builder, search for isprimarykey and choose the plus sign.
  18. On the Schema tab of the DQDL rule builder, choose the plus sign next to Customer_ID.
  19. In the rule editor, delete id.

    The next rule we add checks that the First_Name column value is present for all the rows.
  20. You can also enter the data quality rules directly in the rule editor. Add a comma (,) and enter IsComplete "First_Name", after the first rule.

    Next, you add a custom rule to validate that no row exists without Telephone or Email.
  21. Enter the following custom rule in the rule editor:
    CustomSql "select count(*) from primary where Telephone is null and Email is null" = 0


    The Evaluate Data Quality feature provides actions to manage the outcome of a job based on the job quality results.

  22. For this post, select Fail job when data quality fails and choose Fail job without loading target data actions. In the Data quality output setting section, choose Browse S3 and navigate to prefix dqresults in the S3 bucket starting with gluedataqualitystudio-*.
  23. On the Target menu, choose Amazon S3.
  24. Choose the Data target – S3 bucket node.
  25. On the Data target properties – S3 tab, for Format, choose Parquet, and for Compression Type, choose Snappy.
  26. For S3 Target Location, choose Browse S3 and navigate to the prefix /datalake/curated/customer/ in the S3 bucket starting with gluedataqualitystudio-*.
  27. Choose Save, then choose Run.
    You can view the job run details on the Runs tab. In our example, the job fails with the error message “AssertionError: The job failed due to failing DQ rules for node: <node>.”
    You can review the data quality result on the Data quality tab. In our example, the custom data quality validation failed because one of the rows in the dataset had no Telephone or Email value.Evaluate Data Quality results is also written to the S3 bucket in JSON format based on the data quality result location parameter of the node.
  28. Navigate to dqresults prefix under the S3 bucket starting gluedataqualitystudio-*. You will see that the data quality result is partitioned by date.

The following is the output of the JSON file. You can use this file output to build custom data quality visualization dashboards.

You can also monitor the Evaluate Data Quality node through Amazon CloudWatch metrics and set alarms to send notifications about data quality results. To learn more on how to set up CloudWatch alarms, refer to Using Amazon CloudWatch alarms.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created:

  1. Delete the GlueDataQualityStudio job you created as part of this post.
  2. On the AWS CloudFormation console, delete the GlueDataQualityStudio stack.

Conclusion

AWS Glue Data Quality offers an easy way to measure and monitor the data quality of your ETL pipeline. In this post, you learned how to take necessary actions based on the data quality results, which helps you maintain high data standards and make confident business decisions.

To learn more about AWS Glue Data Quality, check out the documentation:


About the Authors

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architecture on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Yannis Mentekidis is a Senior Software Development Engineer on the AWS Glue team.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and machine learning to make data-driven business decisions. Data consumers lose trust in data if it is not accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, there are various tools available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

We are pleased to announce the public preview launch of AWS Glue Data Quality. You can access this feature today without requesting any additional access in the available Regions. AWS Glue Data Quality is a new preview feature of AWS Glue that measures and monitors the data quality of Amazon S3-based data lakes and in AWS Glue ETL jobs. It does not require any expertise in data engineering or coding. It simplifies your experience of monitoring and evaluating the quality of your data.

This is Part 1 of a four-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

Getting started with AWS Glue Data Quality

In this post, we will go over the simplicity of using the AWS Glue Data Quality feature by:

  1. Starting data quality recommendations and runs on your data in AWS Glue Data Catalog.
  2. Creating an Amazon CloudWatch alarm for getting notifications when data quality results are below a certain threshold.
  3. Analyzing your AWS Glue Data Quality run results through Amazon Athena.

Set up resources with AWS CloudFormation

The provided CloudFormation script creates the following resources for you:

  1. The IAM role required to run AWS Glue Data Quality runs
  2. An Amazon Simple Storage Service (Amazon S3) bucket to store the NYC Taxi dataset
  3. An S3 bucket to store and analyze the results of AWS Glue Data Quality runs
  4. An AWS Glue database and table created from the NYC Taxi dataset

Steps:

  1. Open the AWS CloudFormation console.
  2. Choose Create stack and then select With new resources (standard).
  3. For Template source, choose Upload a template File, and provide the above attached template file. Then choose Next.
  4. For Stack name, DataQualityDatabase, and DataQualityTable, leave as default. For DataQualityS3BucketName, enter the name of your S3 bucket. Then choose Next.
  5. On the final screen, make sure to acknowledge that this stack would create IAM resources for you, and choose Submit.
  6. Once the stack is successfully created, navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Start an AWS Glue Data Quality run on your data in AWS Glue Data Catalog

In this first section, we will generate data quality rule recommendations from the AWS Glue Data Quality service. Using these recommendations, we will then run a data quality task against our dataset to obtain an analysis of our data.

To get started, complete the following steps:

  1. Open AWS Glue console.
  2. Choose Tables under Data Catalog.
  3. Select the DataQualityTable table created via the CloudFormation stack.
  4. Select the Data quality tab.
  5. Choose Recommend ruleset.
  6. On the Recommend data quality rules page, check Save recommended rules as a ruleset. This will allow us to save the recommended rules in a ruleset automatically, for use in the next steps.
  7. For IAM Role, choose the IAM role that was created from the CloudFormation stack.
  8. For Additional configurations -optional, leave the default number of workers and timeout.
  9. Choose Recommend ruleset. This will start a data quality recommendation run, with the given number of workers.
  10. Wait for the ruleset to be completed.
  11. Once completed, navigate back to the Rulesets tab. You should see a successful recommendation run and a ruleset created.

Understand AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like RowCounts, Mean, Standard Deviation etc. of your data, and generate a set of rules, for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • “RowCount between <> and <> ” → Expect a count of number of rows based on the data it saw
  • “ColumnValues “VendorID” in [ ] → Expect the ”VendorID“ column to be within a specific set of values
  • IsComplete “VendorID” → Expect the “VendorID” to be a non-null value

How do I use the recommended AWS Glue Data Quality rules?

  1. From the Rulesets section, you should see your generated ruleset. Select the generated ruleset, and choose Evaluate ruleset.
    • If you didn’t check the box to Save recommended rules as a ruleset when you ran the recommendation, you can still click on the recommendation task run and copy the rules to create a new ruleset
  2. For Data quality actions under Data quality properties, select Publish metrics to Amazon CloudWatch. If this box isn’t checked, the data quality run will not publish metrics to Amazon CloudWatch.
  3. For IAM role, select the GlueDataQualityBlogRole created in the AWS CloudFormation stack.
  4. For Requested number of workers under Advanced properties, leave as default. 
  5. For Data quality results location, select the value of the GlueDataQualityResultsS3Bucket location that was created via the AWS CloudFormation stack
  6. Choose Evaluate ruleset.
  7. Once the run begins, you can see the status of the run on the Data quality results tab.
  8. After the run reaches a successful stage, select the completed data quality task run, and view the data quality results shown in Run results.

Our recommendation service suggested that we enforce 55 rules, based on the column values and the data within our NYC Taxi dataset. We then converted the collection of 55 rules into a RuleSet. Then, we ran a Data Quality Evaluation task run using our RuleSet against our dataset. In our results above, we see the status of each within the RuleSet.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Get Amazon SNS notifications for my failing data quality runs through Amazon CloudWatch alarms

Each AWS Glue Data Quality evaluation run from the Data Catalog, emits a pair of metrics named glue.data.quality.rules.passed (indicating a number of rules that passed) and glue.data.quality.rules.failed (indicating the number of failed rules) per data quality run. This emitted metric can be used to create alarms to alert users if a given data quality run falls below a threshold.

To get started with setting up an alarm that would send an email via an Amazon SNS notification, follow the steps below:

  1. Open Amazon CloudWatch console.
  2. Choose All metrics under Metrics. You will see an additional namespace under Custom namespaces titled Glue Data Quality.

Note: When starting an AWS Glue Data Quality run, make sure the Publish metrics to Amazon CloudWatch checkbox is enabled, as shown below. Otherwise, metrics for that particular run will not be published to Amazon CloudWatch.

  1. Under the Glue Data Quality namespace, you should be able to see metrics being emitted per table, per ruleset. For the purpose of our blog, we shall be using the glue.data.quality.rules.failed rule and alarm, if this value goes over 1 (indicating that, if we see a number of failed rule evaluations greater than 1, we would like to be notified).
  2. In order to create the alarm, choose All alarms under Alarms.
  3. Choose Create alarm.
  4. Choose Select metric.
  5. Select the glue.data.quality.rules.failed metric corresponding to the table you’ve created, then choose Select metric.
  6. Under the Specify metric and conditions tab, under the Metrics section:
    1. For Statistic, select Sum.
    2. For Period, select 1 minute.
  7. Under the Conditions section:
    1. For Threshold type, choose Static.
    2. For Whenever glue.data.quality.rules.failed is…, select Greater/Equal.
    3. For than…, enter 1 as the threshold value.
    4. Expand the Additional configurations dropdown and select Treat missing data as good

These selections imply that if the glue.data.quality.rules.failed metric emits a value greater than or equal to 1, we will trigger an alarm. However, if there is no data, we will treat it as acceptable.

  1. Choose Next.
  2. On Configure actions:
    1. For the Alarm state trigger section, select In alarm .
    2. For Send a notification to the following SNS topic, choose Create a new topic to send a notification via a new SNS topic.
    3. For Email endpoints that will receive the notification…, enter your email address. Choose Next.
  3. For Alarm name, enter myFirstDQAlarm, then choose Next.
  4. Finally, you should see a summary of all the selections on the Preview and create screen. Choose Create alarm at the bottom.
  5. You should now be able to see the alarm being created from the Amazon CloudWatch alarms dashboard.

In order to demonstrate AWS Glue Data Quality alarms, we are going to go over a real-world scenario where we have corrupted data being ingested, and how we could use the AWS Glue Data Quality service to get notified of this, using the alarm we created in the previous steps. For this purpose, we will use the provided file malformed_yellow_taxi.parquet that contains data that has been tweaked on purpose.

  1. Navigate to the S3 location DataQualityS3BucketName mentioned in the CloudFormation template supplied at the beginning of the blog post.
  2. Upload the malformed_yellow_tripdata.parquet file to this location. This will help us simulate a flow where we have a file with poor data quality coming into our data lakes via our ETL processes.
  3. Navigate to the AWS Glue Data Catalog console, select the demo_nyc_taxi_data_input that was created via the provided AWS CloudFormation template and then navigate to the Data quality tab.
  4. Select the RuleSet we had created in the first section. Then select Evaluate ruleset.
  5. From the Evaluate data quality screen:
    1. Check the box to Publish metrics to Amazon CloudWatch. This checkbox is needed to ensure that the failure metrics are emitted to Amazon CloudWatch.
    2. Select the IAM role created via the AWS CloudFormation template.
    3. Optionally, select an S3 location to publish your AWS Glue Data Quality results.
    4. Select Evaluate ruleset.
  6.  Navigate to the Data Quality results tab. You should now see two runs, one from the previous steps of this blog and one that we currently triggered. Wait for the current run to complete.
  7. As you see, we have a failed AWS Glue Data Quality run result, with only 52 of our original 55 rules passing. These failures are attributed to the new file we uploaded to S3.
  8. Navigate to the Amazon CloudWatch console and select the alarm we created at the beginning of this section.
  9. As you can see, we configured the alarm to fire every time the glue.data.quality.rules.failed metric crosses a threshold of 1. After the above AWS Glue Data Quality run, we see 3 rules failing, which triggered the alarm. Further, you also should have gotten an email detailing the alarm’s firing.

We have thus demonstrated an example where incoming malformed data, coming into our data lakes can be identified via the AWS Glue Data Quality rules, and subsequent alerting mechanisms can be created to notify appropriate personas.

Analyze your AWS Glue Data Quality run results through Amazon Athena

In scenarios where you have multiple AWS Glue Data Quality run results against a dataset, over a period of time, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality run results to S3, and use Amazon Athena to run analytical queries against the exported run. The results can then be further used in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In the third part of this post, we will see the steps needed to start tracking data on your dataset’s quality:

  1. For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the AWS CloudFormation stack.
  2. After each successful run, you should see a single JSONL file being exported to your selected S3 location, corresponding to that particular run.
  3. Open the Amazon Athena console.
  4. In the query editor, run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value, and <GlueDataQualityResultsS3Bucket_from_cfn> section with the GlueDataQualityResultsS3Bucket value from the provided AWS CloudFormation template):
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    
    MSCK REPAIR TABLE `<my_table_name>`

  5. Once the above table is created, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows me the failed AWS Glue Data Quality runs against my table demo_nyc_taxi_data_input within a time window:

SELECT * from "<my_table_name>"
WHERE "outcome" = 'Failed'
AND "tablename" = '<my_source_table>'
AND "evaluationcompletedon" between
parse_datetime('2022-12-05 17:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2022-12-05 20:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the above query shows me details about all the runs with “outcome” = ‘Failed’ that ran against my NYC Taxi dataset table ( “tablename” = ‘demo_nyc_taxi_data_input’ ). The output also gives me information about the failure reason ( failurereason ) and the values it was evaluated against ( evaluatedmetrics ).

As you can see, we are able to get detailed information about our AWS Glue Data Quality runs, via the run results uploaded to S3, perform more detailed analysis and build dashboards on top of the data.

Clean up

  • Navigate to the Amazon Athena console and delete the table created for data quality analysis.
  • Navigate to the Amazon CloudWatch console and delete the alarms created.
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  • If you have enabled your AWS Glue Data Quality runs to output to S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using the AWS Glue Data Quality feature, into your AWS Glue Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Amazon Athena, and the process for setting up alarms via Amazon CloudWatch in order to notify users of failed data quality.

To dive into the AWS Glue Data Quality APIs, take a look at the AWS Glue Data Quality API documentation
To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

Build an AWS Lake Formation permissions inventory dashboard using AWS Glue and Amazon QuickSight

Post Syndicated from Srividya Parthasarathy original https://aws.amazon.com/blogs/big-data/build-an-aws-lake-formation-permissions-inventory-dashboard-using-aws-glue-and-amazon-quicksight/

AWS Lake Formation is an integrated data lake service that makes it easy for you to ingest, clean, catalog, transform, and secure your data and make it available for analysis and machine learning (ML). Lake Formation provides a single place to define fine-grained access control on catalog resources. These permissions are granted to the principals by a data lake admin, and integrated engines like Amazon Athena, AWS Glue, Amazon EMR, and Amazon Redshift Spectrum enforce the access controls defined in Lake Formation. It also allows principals to securely share data catalog resources across multiple AWS accounts and AWS organizations through a centralized approach.

As organizations are adopting Lake Formation for scaling their permissions, there is steady increase in the access policies established and managed within the enterprise. However, it becomes more difficult to analyze and understand the permissions for auditing. Therefore, customers are looking for a simple way to collect, analyze, and visualize permissions data so that they can inspect and validate the policies. It also enables organizations to take actions that help them with compliance requirements.

This solution offers the ability to consolidate and create a central inventory of Lake Formation permissions that are registered in the given AWS account and Region. It provides a high-level view of various permissions that Lake Formation manages and aims at answering questions like:

  • Who has select access on given table
  • Which tables have delete permission granted
  • Which databases or tables does the given principal have select access to

In this post, we walk through how to set up and collect the permissions granted on resources in a given account using the Lake Formation API. AWS Glue makes it straightforward to set up and run jobs for collecting the permission data and creating an external table on the collected data. We use Amazon QuickSight to create a permissions dashboard using an Athena data source and dataset.

Overview of solution

The following diagram illustrates the architecture of this solution.

In this solution, we walk through the following tasks:

  1. Create an AWS Glue job to collect and store permissions data, and create external tables using Boto3.
  2. Verify the external tables created using Athena.
  3. Sign up for a QuickSight Enterprise account and enable Athena access.
  4. Create a dataset using an Athena data source.
  5. Use the datasets for analysis.
  6. Publish the analyses as a QuickSight dashboard.

The collected JSON data is flattened and written into an Amazon Simple Storage Service (Amazon S3) bucket as Parquet files partitioned by account ID, date, and resource type. After the data is stored in Amazon S3, external tables are created on them and filters are added for different types of resource permissions. These datasets can be imported into SPICE, an in-memory query engine that is part of QuickSight, or queried directly from QuickSight to create analyses. Later, you can publish these analyses as a dashboard and share it with other users.

Dashboards are created for the following use cases:

  • Database permissions
  • Table permissions
  • Principal permissions

Prerequisites

You should have the following prerequisites:

  • An S3 bucket to store the permissions inventory data
  • An AWS Glue database for permissions inventory metadata
  • An AWS Identity and Access Management (IAM) role for the AWS Glue job with access to the inventory AWS Glue database and S3 bucket and added as a data lake admin
  • A QuickSight account with access to Athena
  • An IAM role for QuickSight with access to the inventory AWS Glue database and S3 bucket

Set up and run the AWS Glue job

We create an AWS Glue job to collect Lake Formation permissions data for the given account and Region that is provided as job parameters, and the collected data is flattened before storage. Data is partitioned by account ID, date, and permissions type, and is stored as Parquet in an S3 bucket using Boto3. We create external tables on the data and add filters for different types of resource permissions.

To create the AWS Glue job, complete the following steps:

  1. Download the Python script file to local.
  2. On the AWS Glue console, under Data Integration and ETL in the navigation pane, choose Jobs.
  3. Under Create job, select Python Shell script editor.
  4. For Options, select Upload and edit an existing script.
  5. For File upload, choose Choose file.
  6. Choose the downloaded file (lf-permissions-inventory.py).
  7. Choose Create.

GlueJob

  1. After the job is created, enter a name for the job (for this post, lf-inventory-builder) and choose Save.

Glue Job save

  1. Choose the Job details tab.
  2. For Name, enter a name for the job.
  3. For IAM Role¸ choose an IAM role that has access to the inventory S3 bucket and inventory schema and registered as data lake admin.
  4. For Type, choose Python Shell.
  5. For Python version, choose Python 3.9.

Glue Job Details

  1. You can leave other property values at their default.
  2. Under Advanced properties¸ configure the following job parameters and values:
    1. catalog-id: with the value as the current AWS account ID whose permissions data are collected.
    2. databasename: with the value as the AWS Glue database where the inventory-related schema objects are created.
    3. region: with the value as the current Region where the job is configured and whose permissions data is collected.
    4. s3bucket: with the value as the S3 bucket where the collected permissions data is written.
    5. createtable: with the value yes, which enables external table creation on the data.

Job Parameters

  1. Choose Save to save the job settings.

Glue Job Save

  1. Choose Run to start the job.

When the job is complete, the run status changes to Succeeded. You can view the log messages in Amazon CloudWatch Logs.

Job Run

Permissions data is collected and stored in the S3 bucket (under lfpermissions-data) that you provided in the job parameters.

S3 Structure

The following external tables are created on the permissions data and can be queried using Athena:

  • lfpermissions – A summary of resource permissions
  • lfpermissionswithgrant – A summary of grantable resource permissions

For both tables, the schema structure is the same and the lftype column indicates what type of permissions the row applies to.

Athena Table Schema

Verify the tables using Athena

You can use Athena to verify the data using the following queries.

For more information, refer to Running SQL queries using Amazon Athena

  • List the database permissions:
Select * from lfpermissions where lftype=’DATABASE’
  • List the table permissions:
Select * from lfpermissions where lftype= ‘TABLE’
  • List the data lake permissions:
Select * from lfpermissions where lftype= ‘DATA_LOCATION’
  • List the grantable database permissions:
Select * from lfpermissionswithgrant where lftype=’DATABASE’
  • List the grantable table permissions:
Select * from lfpermissionswithgrant where lftype= ‘TABLE’
  • List grantable data lake permissions:
Select * from lfpermissionswithgrant where lftype= ‘DATA_LOCATION’

As the next step, we create a QuickSight dashboard with three sheets, each focused on different sets of permissions (database, table, principal) to slice and dice the data.

Sign up for a QuickSight account

If you haven’t signed up for QuickSight, complete the following steps:

  1. Sign in to the AWS Management Console as Admin, search for QuickSight and choose Sign up for QuickSight.

QuickSight signup

  1. For Edition, select Enterprise.
  2. Choose Continue.
  3. For Authentication method, select Use IAM federated identities & QuickSight-managed users.
  4. Under QuickSight Region, choose the same Region as your inventory S3 bucket.
  5. Under Account info, enter a QuickSight account name and email address for notification.

QuickSight Form

  1. In the Quick access to AWS services section, for IAM Role, select Use QuickSight-managed role (default).
  2. Allow access to IAM, Athena, and Amazon S3.
  3. Specify the S3 bucket that contains the permissions data.
  4. Choose Finish to complete the signup process.

QuickSight configuration

Note: If the inventory bucket and database is managed by Lake Formation, grant database and table access to the created QuickSight IAM role. For instructions, refer to Granting and revoking permissions on Data Catalog resources.

Configure your dataset in QuickSight

QuickSight is configured with an Athena data source the same Region as the S3 bucket. To set up your dataset, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.

Quicksight DataSet

  1. Choose Athena as your data source.

QuickSight Datasource

  1. Enter LF_DASHBOARD_DS as the name of your data source.
  2. Choose Create data source.
  3. For Catalog, leave it as AwsDataCatalog.
  4. For Database, choose database name provided as parameter to the Job.
  5. For Tables, select lfpermissions.
  6. Choose Select.

QuickSight Catalog Info

  1. Select Directly query your data and choose Visualize to take you to the analysis.

Quicksight data mode

Create analyses

We create three sheets for our dashboard to view different levels of permissions.

Sheet 1: Database permission view

To view database permissions, complete the following steps:

  1. On the QuickSight console, choose the plus sign to create a new sheet.
  2. Choose Add, then choose Add title.

QuickSight Title

  1. Name the sheet Database Permissions.
  2. Repeat steps (5-7) to add the following parameters:
    • catalogid
    • databasename
    • permission
    • tablename
  3. On the Add menu, choose Add parameter.
  4. Enter a name for the parameter.
  5. Leave the other values as default and choose Create.
  6. Choose Insights in the navigation pane, then choose Add control.

QuickSight Control

  1. Add a control for each parameter:
    1. For each parameter, for Style¸ choose List, and for Values, select Link to a dataset field.
      QuickSight Dependency
    2. Provide additional information for each parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id
databasename DatabaseName lfpermissions databasename
permission Permission lfpermissions permission
  1. Add a control dependency and for Database, choose the options menu and choose Edit.

QuickSight Dependency

  1. Under Format control, choose Control options.
  2. Change the relevant values, choose AccountID, and choose Update.
  3. Similarly, under Permission control, choose Control options.
  4. Change the relevant values, choose AccountID, and choose Update.

We create two visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id and databasename into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop principal into Values and change the aggregation to Count distinct.

QuickSight Database View1

  1. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  2. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  3. Add a filter on databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  4. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.
  5. Choose Actions in the navigation pane.
  6. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check principal.

Now we add our second visual.

  1. Add a second visual and choose the table visual type.
  2. Drag and drop principal to Group by.
  3. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  4. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  5. Add a filter on databasename the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  6. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.

Now the Database and Permission drop-down menus are populated based on the relevant attributes and changes dynamically.

QuickSight Database View2

Sheet 2: Table permission view

Now that we have created the database permissions sheet, we can add a table permissions sheet.

  1. Choose the plus sign to add a new sheet.
  2. On the QuickSight console, choose Add, then choose Add title.
  3. Name the sheet Table Permissions.
  4. Choose Insights in the navigation pane, then choose Add control.
  5. Add a control for each parameter:
    1. For each parameter, for Style¸ choose List, and for Values, select Link to a dataset field.
    2. Provide the additional information for each parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id
databasename DatabaseName lfpermissions databasename
permission Permission lfpermissions permission
tablename TableName lfpermissions tablename

We create two visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id, databasename, and tablename into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop principal into Values and change the aggregation to Count distinct.
  5. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  6. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  7. Add a filter on the databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  8. Add a filter on tablename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose tablename.
  9. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.
  10. Choose Actions in the navigation pane.
  11. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check principal.

Now we add our second visual.

  1. Add a second visual and choose the table visual type.
  2. Drag and drop principal to Group by.
  3. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  4. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  5. Add a filter on the databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  6. Add a filter on tablename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose tablename.
  7. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.

Now the Databasename, Tablename, and Permission drop-down menus are populated based on the relevant attributes.

QuickSight Table Permissions

Sheet 3: Principal permission view

Now we add a third sheet for principal permissions.

  1. Choose the plus sign to add a new sheet.
  2. On the QuickSight console, choose Add, then choose Add title.
  3. Name the sheet Principal Permissions.
  4. Choose Insights in the navigation pane, then choose Add control.
  5. Add a control for the catalogid parameter:
    1. For Style¸ choose List, and for Values, select Link to a dataset field.
    2. Provide the additional information for the parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id

We create four visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id and principal into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop databasename into Values and change the aggregation to Count distinct.
  5. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  6. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  7. Choose Actions in the navigation pane.
  8. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check Databasename.
  9. For the second visual, choose Visualize and choose table as the visual type.
  10. Drag and drop databasename into Group by.
  11. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  12. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  13. For the third visual, choose Visualize and choose pivot table as the visual type.
  14. Drag and drop catalog_id and principal into Rows.
  15. Drag and drop permission into Column.
  16. Drag and drop tablename into Values and change the aggregation to Count distinct.
  17. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  18. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  19. Choose Actions in the navigation pane.
  20. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check Tablename.
  21. For the final visual, choose Visualize and choose table as the visual type.
  22. Drag and drop tablename into Group by.
  23. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  24. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.

The following screenshot shows our sheet.

QuickSight Prinicipal View

Create a dashboard

Now that the analysis is ready, you can publish it as a dashboard and share it with other users. For instructions, refer to the tutorial Create an Amazon QuickSight dashboard.

Clean up

To clean up the resources created in this post, complete the following steps:

  1. Delete the AWS Glue job lf-inventory-builder.
  2. Delete the data stored under the bucket provided as the value of the s3bucket job parameter.
  3. Drop the external tables created under the schema provided as the value of the databasename job parameter.
  4. If you signed up for QuickSight to follow along with this post, you can delete the account.
  5. For an existing QuickSight account, delete the following resources:
    1. lfpermissions dataset
    2. lfpermissions analysis
    3. lfpermissions dashboard

Conclusion

In this post, we provided a design and implementation steps for a solution to collect Lake Formation permissions in a given Region of an account and consolidate them for analysis. We also walked through the steps to create a dashboard using Amazon QuickSight. You can utilize other QuickSight visuals to create more sophisticated dashboards based on your requirements.

You can also expand this solution to consolidate permissions for a multi-account setup. You can use a shared bucket across organizations and accounts and configure an AWS Glue job in each account or organization to write their permission data. With this solution, you can maintain a unified dashboard view of all the Lake Formation permissions within your organization, thereby providing a central audit mechanism to comply with business requirements.

Thanks for reading this post! If you have any comments or questions, please don’t hesitate to leave them in the comments section.


About the Author

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building analytics and data mesh solutions on AWS and sharing them with the community.

How to use Amazon Macie to preview sensitive data in S3 buckets

Post Syndicated from Koulick Ghosh original https://aws.amazon.com/blogs/security/how-to-use-amazon-macie-to-preview-sensitive-data-in-s3-buckets/

Security teams use Amazon Macie to discover and protect sensitive data, such as names, payment card data, and AWS credentials, in Amazon Simple Storage Service (Amazon S3). When Macie discovers sensitive data, these teams will want to see examples of the actual sensitive data found. Reviewing a sampling of the discovered data helps them quickly confirm that the object is truly sensitive according to their data protection and privacy policies.

In this post, we walk you through how your data security teams are able to use a new capability in Amazon Macie to retrieve up to 10 examples of sensitive data found in your S3 objects, so that you are able to confirm the nature of the data at a glance. Additionally, we will discuss how you are able to control who is able to use this capability, so that only authorized personnel have permissions to view these examples.

The challenge customers face

After a Macie sensitive data discovery job is run, security teams start their work. The security team will review the Macie findings to investigate the discovered sensitive data and decide what actions to take to protect such data. The findings provide details that include the severity of the finding, information on the affected S3 object, and a summary of the type, location, and amount of sensitive data found. However, Macie findings only contain pointers to data that Macie found in the object. In order to complete their investigation, customers in the past had to do additional work to extract the contents of a sensitive object, such as navigating to a different AWS account where the object is located, downloading and manually searching for keywords in a file editor, or writing and refining SQL queries by using Amazon S3 Select. The investigations are further slowed down when the object type is one that is not easily readable without additional tooling, such as big-data file types like Avro and Parquet. By using the Macie capability to retrieve sensitive data samples, you are able to review the discovered data and make decisions concerning the finding remediation.

Prerequisites

To implement the ability to retrieve and reveal samples of sensitive data, you’ll need the following prerequisites:

  • Enable Amazon Macie in your AWS account. For instructions, see Getting started with Amazon Macie.
  • Set your account as the delegated Macie administrator account and enable Macie in at least one member account by using AWS Organizations. In this post, we will refer to the delegated administrator account as Account A and the member account as Account B.
  • Configure Macie detailed classification results in Account A.

    Note: The detailed classification results contain a record for each Amazon S3 object that you configure the job to analyze, and include the location of up to 1,000 occurrences of each type of sensitive data that Macie found in an object. Macie uses the location information in the detailed classification results to retrieve the examples of sensitive data. The detailed classification results are stored in an S3 bucket of your choice. In this post, we will refer to this bucket as DOC-EXAMPLE-BUCKET1.

  • Create an S3 bucket that contains sensitive data in Account B. In this post, we will refer to this bucket as DOC-EXAMPLE-BUCKET2.

    Note: You should enable server-side encryption on this bucket by using customer managed AWS Key Management Service (AWS KMS) keys (a type of encryption known as SSE-KMS).

  • (Optional) Add sensitive data to DOC-EXAMPLE-BUCKET2. This post uses a sample dataset that contains fake sensitive data. You are able to download this sample dataset, unarchive the .zip folder, and follow these steps to upload the objects to S3. This is a synthetic dataset generated by AWS that we will use for the examples in this post. All data in this blog post has been artificially created by AWS for demonstration purposes and has not been collected from any individual person. Similarly, such data does not relate back to any individual person, nor is it intended to.
  • Create and run a sensitive data discovery job from Account A to analyze the contents of DOC-EXAMPLE-BUCKET2.
  • (Optional) Set up the AWS Command Line Interface (AWS CLI).

Configure Macie to retrieve and reveal examples of sensitive data

In this section, we’ll describe how to configure Macie so that you are able to retrieve and view examples of sensitive data from Macie findings.

To configure Macie (console)

  • In the AWS Management Console, in the Macie delegated administrator account (Account A), follow these steps from the Amazon Macie User Guide.

To configure Macie (AWS CLI)

  1. Confirm that you have Macie enabled.
    	$ aws macie2 get-macie-session --query 'status'
    	// The expected response is "ENABLED"

  2. Confirm that you have configured the detailed classification results bucket.
    	$ aws macie2 get-classification-export-configuration
    
    	// The expected response is:
    	{
       	 "configuration": {
       		 	    "s3Destination": {
            		    "bucketName": " DOC-EXAMPLE-BUCKET1 ",
               			"kmsKeyArn": "arn:aws:kms:<YOUR-REGION>:<YOUR-ACCOUNT-ID>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET1>"
         		  	 }
    		}	
    	} 

  3. Create a new KMS key to encrypt the retrieved examples of sensitive data. Make sure that the key is created in the same AWS Region where you are operating Macie.
    $ aws kms create-key
    {
        "KeyMetadata": {
            "Origin": "AWS_KMS",
            "KeyId": "<YOUR-KEY-ID>",
            "Description": "",
            "KeyManager": "CUSTOMER",
            "Enabled": true,
            "KeySpec": "SYMMETRIC_DEFAULT",
            "CustomerMasterKeySpec": "SYMMETRIC_DEFAULT",
            "KeyUsage": "ENCRYPT_DECRYPT",
            "KeyState": "Enabled",
            "CreationDate": 1502910355.475,
            "Arn": "arn:aws:kms: <YOUR-AWS-REGION>:<AWS-ACCOUNT-A>:key/<YOUR-KEY-ID>",
            "AWSAccountId": "<AWS-ACCOUNT-A>",
            "MultiRegion": false
            "EncryptionAlgorithms": [
                "SYMMETRIC_DEFAULT"
            ],
        }
    }

  4. Give this key the alias REVEAL-KMS-KEY.
    $ aws kms CreateAlias
    {
       "AliasName": " <REVEAL-KMS-KEY> ",
       "TargetKeyId": "<YOUR-KEY-ID>"
    }

  5. Enable the feature in Macie and configure it to encrypt the data by using REVEAL-KMS-KEY. You do not specify a key policy for your new KMS key in this step. The key policy will be discussed later in the post.
    $ aws macie2 update-reveal-configuration --configuration '{"status":"ENABLED","kmsKeyId":"alias/ <REVEAL-KMS-KEY> "}'
    
    // The expected response is:
    {
        "configuration": {
            "kmsKeyId": "arn:aws:kms:<YOUR-REGION>: <YOUR ACCOUNT ID>:key/<REVEAL-KMS-KEY>.",
            "status": "ENABLED"
        }
    }

Control access to read sensitive data and protect data displayed in Macie

This new Macie capability uses the AWS Identity and Access Management (IAM) policies, S3 bucket policies, and AWS KMS key policies that you have defined in your accounts. This means that in order to see examples through the Macie console or by invoking the Macie API, the IAM principal needs to have read access to the S3 object and to decrypt the object if it is server-side encrypted. It’s important to note that Macie uses the IAM permissions of the AWS principal to locate, retrieve, and reveal the samples and does not use the Macie service-linked role to perform these tasks.

Using the setup discussed in the previous section, you will walk through how to control access to the ability to retrieve and reveal sensitive data examples. To recap, you created and ran a discovery job from the Amazon Macie delegated administrator account (Account A) to analyze the contents of DOC-EXAMPLE-BUCKET2 in a member account (Account B). You configured Macie to retrieve examples and to encrypt the examples of sensitive data with the REVEAL-KMS-KEY.

The next step is to create and use an IAM role that will be assumed by other users in Account A to retrieve and reveal examples of sensitive data discovered by Macie. In this post, we’ll refer to this role as MACIE-REVEAL-ROLE.

To apply the principle of least privilege and allow only authorized personnel to view the sensitive data samples, grant the following permissions so that Macie users who assume MACIE-REVEAL-ROLE will be able to successfully retrieve and reveal examples of sensitive data:

  • Step 1 – Update the IAM policy for MACIE-REVEAL-ROLE.
  • Step 2 – Update the KMS key policy for REVEAL-KMS-KEY.
  • Step 3 – Update the S3 bucket policy for DOC-EXAMPLE-BUCKET2 and the KMS key policy used for its server-side encryption in Account B.

After you grant these permissions, MACIE-REVEAL-ROLE is succcesfully able to retrieve and reveal examples of sensitive data in DOC-EXAMPLE-BUCKET2, as shown in Figure 1.

Figure 1: Macie runs the discovery job from the delegated administrator account in a member account, and MACIE-REVEAL-ROLE retrieves examples of sensitive data

Figure 1: Macie runs the discovery job from the delegated administrator account in a member account, and MACIE-REVEAL-ROLE retrieves examples of sensitive data

Step 1: Update the IAM policy

Provide the following required permissions to MACIE-REVEAL-ROLE:

  1. Allow GetObject from DOC-EXAMPLE-BUCKET2 in Account B.
  2. Allow decryption of DOC-EXAMPLE-BUCKET2 if it is server-side encrypted with a customer managed key (SSE-KMS).
  3. Allow GetObject from DOC-EXAMPLE-BUCKET1.
  4. Allow decryption of the Macie discovery results.
  5. Allow the necessary Macie actions to retrieve and reveal sensitive data examples.

To set up the required permissions

  • Use the following commands to provide the permissions. Make sure to replace the placeholders with your own data.
    {
        "Version": "2012-10-17",
        "Statement": [
    	{
                "Sid": "AllowGetFromCompanyDataBucket",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET2>/*"
            },
            {
                "Sid": "AllowKMSDecryptForCompanyDataBucket",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt"
                ],
                "Resource": "arn:aws:kms:<AWS-Region>:<AWS-Account-B>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET2>"
            },
            {
                "Sid": "AllowGetObjectfromMacieResultsBucket",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET1>/*"
            },
    	{
                "Sid": "AllowKMSDecryptForMacieRoleDiscoveryBucket",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt"
                ],
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-A>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET1>"
            },
    	{
                "Sid": "AllowActionsRetrieveAndReveal",
                "Effect": "Allow",
                "Action": [
                    "macie2:GetMacieSession",
                    "macie2:GetFindings",
                    "macie2:GetSensitiveDataOccurrencesAvailability",
                    "macie2:GetSensitiveDataOccurrences",
                    "macie2:ListFindingsFilters",
                    "macie2:GetBucketStatistics",
                    "macie2:ListMembers",
                    "macie2:ListFindings",
                    "macie2:GetFindingStatistics",
                    "macie2:GetAdministratorAccount",
                    "macie2:GetClassificationExportConfiguration",
                    "macie2:GetRevealConfiguration",
                    "macie2:DescribeBuckets"
                ],
                "Resource": "*” 
            }
        ]
    }

Step 2: Update the KMS key policy

Next, update the KMS key policy that is used to encrypt sensitive data samples that you retrieve and reveal in your delegated administrator account.

To update the key policy

  • Allow the MACIE-REVEAL-ROLE access to the KMS key that you created for protecting the retrieved sensitive data, using the following commands. Make sure to replace the placeholders with your own data.
    	{
                "Sid": "AllowMacieRoleDecrypt",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam:<AWS-REGION>:<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": [
                    "kms:Decrypt",
                    "kms:DescribeKey",
                    "kms:GenerateDataKey"
                ],
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-A>:key/<REVEAL-KMS-KEY>"
            }

Step 3: Update the bucket policy of the S3 bucket

Finally, update the bucket policy of the S3 bucket in member accounts, and update the key policy of the key used for SSE-KMS.

To update the S3 bucket policy and KMS key policy

  1. Use the following commands to update key policy for the KMS key used for server-side encryption of the DOC-EXAMPLE-BUCKET2 bucket in Account B.
    	{
                "Sid": "AllowMacieRoleDecrypt”
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam:<AWS-REGION>:<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": "kms:Decrypt",
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-B>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET2>"
      }

  2. Use the following commands to update the bucket policy of DOC-EXAMPLE-BUCKET2 to allow cross-account access for MACIE-REVEAL-ROLE to get objects from this bucket.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowMacieRoleGet",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET2>/*"
            }
        ]
    }

Retrieve and reveal sensitive data samples

Now that you’ve put in place the necessary permissions, users who assume MACIE-REVEAL-ROLE will be able to conveniently retrieve and reveal sensitive data samples.

To retrieve and reveal sensitive data samples

  1. In the Macie console, in the left navigation pane, choose Findings, and select a specific finding. Under Sensitive Data, choose Review.
    Figure 2: The finding details panel

    Figure 2: The finding details panel

  2. On the Reveal sensitive data page, choose Reveal samples.
    Figure 3: The Reveal sensitive data page

    Figure 3: The Reveal sensitive data page

  3. Under Sensitive data, you will be able to view up to 10 examples of the sensitive data found by Amazon Macie.
    Figure 4: Examples of sensitive data revealed in the Amazon Macie console

    Figure 4: Examples of sensitive data revealed in the Amazon Macie console

You are able to find additional information on setting up the Macie Reveal function in the Amazon Macie User Guide.

Conclusion

In this post, we showed how you are to retrieve and review examples of sensitive data that were found in Amazon S3 using Amazon Macie. This capability will make it easier for your data protection teams to review the sensitive contents found in S3 buckets across the accounts in your AWS environment. With this information, security teams are able to quickly take remediation actions, such as updating the configuration of sensitive buckets, quarantining files with sensitive information, or sending a notification to the owner of the account where the sensitive data resides. In certain cases, you are able to add the examples to an allow list in Macie if you don’t want Macie to report those as sensitive data (for example, corporate addresses or sample data that is used for testing).

The following are links to additional resources that you will be able to use to expand your knowledge of Amazon Macie capabilities and features:

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on Amazon Macie re:Post.

Want more AWS Security news? Follow us on Twitter.

Koulick Ghosh

Koulick Ghosh

Koulick is a Senior Product Manager in AWS Security based in Seattle, WA. He loves speaking with customers on how AWS Security services can help make them more secure. In his free-time, he enjoys playing the guitar, reading, and exploring the Pacific Northwest.

Author

Michael Ingoldby

Michael is a Senior Security Solutions Architect at AWS based in Frisco, Texas. He provides guidance and helps customers to implement AWS native security services. Michael has been working in the security domain since 2006. When he is not working, he enjoys spending time outdoors.

Robert Wu

Robert Wu

Robert is the Software Development Engineer for AWS Macie, working on enabling customers with more sensitive data discovery capabilities. In his free time, he enjoys exploring and contributing to various open-source projects to widen his domain knowledge.

Introducing the Cloud Shuffle Storage Plugin for Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, an open-source, distributed processing system for your data integration tasks and big data workloads.

Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there isn’t enough disk space left on the executor and there is no recovery. Such Spark jobs can’t typically succeed without adding additional compute and attached storage, wherein compute is often idle, and results in additional cost.

In 2021, we launched Amazon S3 shuffle for AWS Glue 2.0 with Spark 2.4. This feature disaggregated Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle files. Using Amazon S3 for Spark shuffle storage enabled you to run data-intensive workloads more reliably. After the launch, we continued investing in this area, and collected customer feedback.

Today, we’re pleased to release Cloud Shuffle Storage Plugin for Apache Spark. It supports the latest Apache Spark 3.x distribution so you can take advantage of the plugin in AWS Glue or any other Spark environments. It’s now also natively available to use in AWS Glue Spark jobs on AWS Glue 3.0 and the latest AWS Glue version 4.0 without requiring any extra setup or bringing external libraries. Like the Amazon S3 shuffle for AWS Glue 2.0, the Cloud Shuffle Storage Plugin helps you solve constrained disk space errors during shuffle in serverless Spark environments.

We’re also excited to announce the release of software binaries for the Cloud Shuffle Storage Plugin for Apache Spark under the Apache 2.0 license. You can download the binaries and run them on any Spark environment. The new plugin is open-cloud, comes with out-of-the box support for Amazon S3, and can be easily configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage.

Understanding a shuffle operation in Apache Spark

In Apache Spark, there are two types of transformations:

  • Narrow transformation – This includes map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – This includes join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions. Spark SQL queries including JOIN, ORDER BY, GROUP BY require wide transformations.

A wide transformation triggers a shuffle, which occurs whenever data is reorganized into new partitions with each key assigned to one of them. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. The volume of data shuffled is visible in the Spark UI. When shuffle writes take up more space than the local available disk capacity, it causes a No space left on device error.

To illustrate one of the typical scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join and group by.

Let’s run the following query on AWS Glue 3.0 job with 10 G1.X workers where a total of 640GB of local disk space is available:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the Executor tab in the Spark UI.
Spark UI Executor Tab

The following screenshot shows the status of Spark jobs included in the AWS Glue job run.
Spark UI Jobs
In the failed Spark job (job ID=7), we can see the failed Spark stage in the Spark UI.
Spark UI Failed stage
There was 167.8GiB shuffle write during the stage, and 14 tasks failed due to the error java.io.IOException: No space left on device because the host 172.34.97.212 ran out of local disk.
Spark UI Tasks

Cloud Shuffle Storage for Apache Spark

Cloud Shuffle Storage for Apache Spark allows you to store Spark shuffle files on Amazon S3 or other cloud storage services. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably. The following figure illustrates how Spark map tasks write the shuffle files to the Cloud Shuffle Storage. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle storage.

This architecture enables your serverless Spark jobs to use Amazon S3 without the overhead of running, operating, and maintaining additional storage or compute nodes.
Chopper diagram
The following Glue job parameters enable and tune Spark to use S3 buckets for storing shuffle data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key Value Explanation
--write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
--conf spark.shuffle.storage.path=s3://<shuffle-bucket> This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data.

The shuffle files are written to the location and create files such as following:

s3://<shuffle-storage-path>/<Spark application ID>/[0-9]/<Shuffle ID>/shuffle_<Shuffle ID>_<Mapper ID>_0.data

With the Cloud Shuffle Storage plugin enabled and using the same AWS Glue job setup, the TPC-DS query now succeeded without any job or stage failures.
Spark UI Jobs with Chopper plugin

Software binaries for the Cloud Shuffle Storage Plugin

You can now also download and use the plugin in your own Spark environments and with other cloud storage services. The plugin binaries are available for use under the Apache 2.0 license.

Bundle the plugin with your Spark applications

You can bundle the plugin with your Spark applications by adding it as a dependency in your Maven pom.xml as you develop your Spark applications, as shown in the follwoing code. For more details on the plugin and Spark versions, refer to Plugin versions.

<repositories>
   ...
    <repository>
        <id>aws-glue-etl-artifacts</id>
        <url>https://aws-glue-etl-artifacts.s3.amazonaws.com/release/</url>
    </repository>
</repositories>
...
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>chopper-plugin</artifactId>
    <version>3.1-amzn-LATEST</version>
</dependency>

You can alternatively download the binaries from AWS Glue Maven artifacts directly and include them in your Spark application as follows:

#!/bin/bash
sudo wget -v https://aws-glue-etl-artifacts.s3.amazonaws.com/release/com/amazonaws/chopper-plugin/3.1-amzn-LATEST/chopper-plugin-3.1-amzn-LATEST.jar -P /usr/lib/spark/jars

Submit the Spark application by including the JAR files on the classpath and specifying the two Spark configs for the plugin:

spark-submit --deploy-mode cluster \
--conf spark.shuffle.sort.io.plugin.class=com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin \
--conf spark.shuffle.storage.path=s3://<s3 bucket>/<shuffle-dir> \
 --class <your class> <your application jar> 

The following Spark parameters enable and configure Spark to use an external storage URI such as Amazon S3 for storing shuffle files; the URI protocol determines which storage system to use.

Key Value Explanation
spark.shuffle.storage.path s3://<shuffle-storage-path> It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed
spark.shuffle.sort.io.plugin.class com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin The entry class in the plugin

Other cloud storage integration

This plugin comes with out-of-the box support for Amazon S3 and can also be configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage. To enable other Hadoop FileSystem compatible cloud storage services, you can simply add a storage URI for the corresponding service scheme, such as gs:// for Google Cloud Storage instead of s3:// for Amazon S3, add the FileSystem JAR files for the service, and set the appropriate authentication configurations.

For more information about how to integrate the plugin with Google Cloud Storage and Microsoft Azure Blob Storage, refer to Using AWS Glue Cloud Shuffle Plugin for Apache Spark with Other Cloud Storage Services.

Best practices and considerations

Note the following considerations:

  • This feature replaces local shuffle storage with Amazon S3. You can use it to address common failures with price/performance benefits for your serverless analytics jobs and pipelines. We recommend enabling this feature when you want to ensure reliable runs of your data-intensive workloads that create a large amount of shuffle data or when you’re getting No space left on device error. You can also use this plugin if your job encounters fetch failures org.apache.spark.shuffle.MetadataFetchFailedException or if your data is skewed.
  • We recommend setting S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.storage.s3.path) in order to clean up old shuffle data automatically.
  • The shuffle data on Amazon S3 is encrypted by default. You can also encrypt the data with your own AWS Key Management Service (AWS KMS) keys.

Conclusion

This post introduced the new Cloud Shuffle Storage Plugin for Apache Spark and described its benefits to independently scale storage in your Spark jobs without adding additional workers. With this plugin, you can expect jobs processing terabytes of data to run much more reliably.

The plugin is available in AWS Glue 3.0 and 4.0 Spark jobs in all AWS Glue supported Regions. We’re also releasing the plugin’s software binaries under the Apache 2.0 license. You can use the plugin in AWS Glue or other Spark environments. We look forward to hearing your feedback.


About the Authors

Noritaka Sekiyama s a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Chuhan Liu is a Software Development Engineer on the AWS Glue team.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with data integration and connectivity to a variety of sources, efficiently manage data lakes on Amazon S3, and optimizes Apache Spark for fault-tolerance with ETL workloads.

New — Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-amazon-sagemaker-data-wrangler-supports-saas-applications-as-data-sources/

Data fuels machine learning. In machine learning, data preparation is the process of transforming raw data into a format that is suitable for further processing and analysis. The common process for data preparation starts with collecting data, then cleaning it, labeling it, and finally validating and visualizing it. Getting the data right with high quality can often be a complex and time-consuming process.

This is why customers who build machine learning (ML) workloads on AWS appreciate the ability of Amazon SageMaker Data Wrangler. With SageMaker Data Wrangler, customers can simplify the process of data preparation and complete the required processes of the data preparation workflow on a single visual interface. Amazon SageMaker Data Wrangler helps to reduce the time it takes to aggregate and prepare data for ML.

However, due to the proliferation of data, customers generally have data spread out into multiple systems, including external software-as-a-service (SaaS) applications like SAP OData for manufacturing data, Salesforce for customer pipeline, and Google Analytics for web application data. To solve business problems using ML, customers have to bring all of these data sources together. They currently have to build their own solution or use third-party solutions to ingest data into Amazon S3 or Amazon Redshift. These solutions can be complex to set up and not cost-effective.

Introducing Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources
I’m happy to share that starting today, you can aggregate external SaaS application data for ML in Amazon SageMaker Data Wrangler to prepare data for ML. With this feature, you can use more than 40 SaaS applications as data sources via Amazon AppFlow and have these data available on Amazon SageMaker Data Wrangler. Once the data sources are registered in AWS Glue Data Catalog by AppFlow, you can browse tables and schemas from these data sources using Data Wrangler SQL explorer. This feature provides seamless data integration between SaaS applications and SageMaker Data Wrangler using Amazon AppFlow.

Here is a quick preview of this new feature:

This new feature of Amazon SageMaker Data Wrangler works by using integration with Amazon AppFlow, a fully managed integration service that enables you to securely exchange data between SaaS applications and AWS services. With Amazon AppFlow, you can establish bidirectional data integration between SaaS applications, such as Salesforce, SAP, and Amplitude and all supported services, into your Amazon S3 or Amazon Redshift.

Then, with Amazon AppFlow, you can catalog the data in AWS Glue Data Catalog. This is a new feature where with Amazon AppFlow, you can create an integration with AWS Glue Data Catalog for Amazon S3 destination connector. With this new integration, customers can catalog SaaS data applications into AWS Glue Data Catalog with a few clicks, directly from the Amazon AppFlow Flow configuration, without the need to run any crawlers.

Once you’ve established a flow and inserted it into the AWS Glue Data Catalog, you can use this data inside the Amazon SageMaker Data Wrangler. Then, you can do the data preparation as you usually do. You can write Amazon Athena queries to preview data, join data from multiple sources, or import data to prepare for ML model training.

With this feature, you need to do a few simple steps to perform seamless data integration between SaaS applications into Amazon SageMaker Data Wrangler via Amazon AppFlow. This integration supports more than 40 SaaS applications, and for a complete list of supported applications, please check the Supported source and destination applications documentation.

Get Started with Amazon SageMaker Data Wrangler Support for Amazon AppFlow
Let’s see how this feature works in detail. In my scenario, I need to get data from Salesforce, and do the data preparation using Amazon SageMaker Data Wrangler.

To start using this feature, the first thing I need to do is to create a flow in Amazon AppFlow that registers the data source into the AWS Glue Data Catalog. I already have an existing connection with my Salesforce account, and all I need now is to create a flow.

One important thing to note is that to make SaaS application data available in Amazon SageMaker Data Wrangler, I need to create a flow with Amazon S3 as the destination. Then, I need to enable Create a Data Catalog table in the AWS Glue Data Catalog settings. This option will automatically catalog my Salesforce data into AWS Glue Data Catalog.

On this page, I need to select a user role with the required AWS Glue Data Catalog permissions and define the database name and the table name prefix. In addition, in this section, I can define the data format preference, be it in JSON, CSV, or Apache Parquet formats, and filename preference if I want to add a timestamp into the file name section.

To learn more about how to register SaaS data in Amazon AppFlow and AWS Glue Data Catalog, you can read Cataloging the data output from an Amazon AppFlow flow documentation page.

Once I’ve finished registering SaaS data, I need to make sure the IAM role can view the data sources in Data Wrangler from AppFlow. Here is an example of a policy in the IAM role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "glue:SearchTables",
            "Resource": [
                "arn:aws:glue:*:*:table/*/*",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:catalog"
            ]
        }
    ]
} 

By enabling data cataloging with AWS Glue Data Catalog, from this point on, Amazon SageMaker Data Wrangler will be able to automatically discover this new data source and I can browse tables and schema using the Data Wrangler SQL Explorer.

Now it’s time to switch to the Amazon SageMaker Data Wrangler dashboard then select Connect to data sources.

On the following page, I need to Create connection and select the data source I want to import. In this section, I can see all the available connections for me to use. Here I see the Salesforce connection is already available for me to use.

If I would like to add additional data sources, I can see a list of external SaaS applications that I can integrate into the Set up new data sources section. To learn how to recognize external SaaS applications as data sources, I can learn more with the select How to enable access.

Now I will import datasets and select the Salesforce connection.

On the next page, I can define connection settings and import data from Salesforce. When I’m done with this configuration, I select Connect.

On the following page, I see my Salesforce data that I already configured with Amazon AppFlow and AWS Glue Data Catalog called appflowdatasourcedb. I can also see a table preview and schema for me to review if this is the data I need.

Then, I start building my dataset using this data by performing SQL queries inside the SageMaker Data Wrangler SQL Explorer. Then, I select Import query.

Then, I define a name for my dataset.

At this point, I can start doing the data preparation process. I can navigate to the Analysis tab to run the data insight report. The analysis will provide me with a report on the data quality issues and what transform I need to use next to fix the issues based on the ML problem I want to predict. To learn more about how to use the data analysis feature, see Accelerate data preparation with data quality and insights in the Amazon SageMaker Data Wrangler blog post.

In my case, there are several columns I don’t need, and I need to drop these columns. I select Add step.

One feature I like is that Amazon SageMaker Data Wrangler provides numerous ML data transforms. It helps me to streamline the process of cleaning, transforming and feature engineering my data in one dashboard. For more about what SageMaker Data Wrangler provides for transformation data, please read this Transform Data documentation page.

In this list, I select Manage columns.

Then, in the Transform section, I select the Drop column option. Then, I select a few columns that I don’t need.

Once I’m done, the columns I don’t need are removed and the Drop column data preparation step I just created is listed in the Add step section.

I can also see the visual of my data flow inside the Amazon SageMaker Data Wrangler. In this example, my data flow is quite basic. But when my data preparation process becomes complex, this visual view makes it easy for me to see all the data preparation steps.

From this point on, I can do what I require with my Salesforce data. For example, I can export data directly to Amazon S3 by selecting Export to and choosing Amazon S3 from the Add destination menu. In my case, I specify Data Wrangler to store the data in Amazon S3 after it has processed it by selecting Add destination and then Amazon S3.

Amazon SageMaker Data Wrangler provides me flexibility to automate the same data preparation flow using scheduled jobs. I can also automate feature engineering with SageMaker Pipelines (via Jupyter Notebook) and SageMaker Feature Store (via Jupyter Notebook), and deploy to Inference end point with SageMaker Inference Pipeline (via Jupyter Notebook).

Things to Know
Related news – This feature will make it easy for you to do data aggregation and preparation with Amazon SageMaker Data Wrangler. As this feature is an integration with Amazon AppFlow and also AWS Glue Data Catalog, you might want to learn more on Amazon AppFlow now supports AWS Glue Data Catalog integration and provides enhanced data preparation page.

Availability – Amazon SageMaker Data Wrangler supports SaaS applications as data sources available in all the Regions currently supported by Amazon AppFlow.

Pricing – There is no additional cost to use SaaS applications supports in Amazon SageMaker Data Wrangler, but there is a cost to running Amazon AppFlow to get the data in Amazon SageMaker Data Wrangler.

Visit Import Data From Software as a Service (SaaS) Platforms documentation page to learn more about this feature, and follow the getting started guide to start data aggregating and preparing SaaS applications data with Amazon SageMaker Data Wrangler.

Happy building!
Donnie

Join the Preview – AWS Glue Data Quality

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/join-the-preview-aws-glue-data-quality/

Back in 1980, at my second professional programming job, I was working on a project that analyzed driver’s license data from a bunch of US states. At that time data of that type was generally stored in fixed-length records, with values carefully (or not) encoded into each field. Although we were given schemas for the data, we would invariably find that the developers had to resort to tricks in order to represent values that were not anticipated up front. For example, coding for someone with heterochromia, eyes of different colors. We ended up doing a full scan of the data ahead of our actual time-consuming and expensive analytics run in order to make sure that we were dealing with known data. This was my introduction to data quality, or the lack thereof.

AWS makes it easier for you to build data lakes and data warehouses at any scale. We want to make it easier than ever before for you to measure and maintain the desired quality level of the data that you ingest, process, and share.

Introducing AWS Glue Data Quality
Today I would like to tell you about AWS Glue Data Quality, a new set of features for AWS Glue that we are launching in preview form. It can analyze your tables and recommend a set of rules automatically based on what it finds. You can fine-tune those rules if necessary and you can also write your own rules. In this blog post I will show you a few highlights, and will save the details for a full post when these features progress from preview to generally available.

Each data quality rule references a Glue table or selected columns in a Glue table and checks for specific types of properties: timeliness, accuracy, integrity, and so forth. For example, a rule can indicate that a table must have the expected number of columns, that the column names match a desired pattern, and that a specific column is usable as a primary key.

Getting Started
I can open the new Data quality tab on one of my Glue tables to get started. From there I can create a ruleset manually, or I can click Recommend ruleset to get started:

Then I enter a name for my Ruleset (RS1), choose an IAM Role that has permission to access it, and click Recommend ruleset:

My click initiates a Glue Recommendation task (a specialized type of Glue job) that scans the data and makes recommendations. Once the task has run to completion I can examine the recommendations:

I click Evaluate ruleset to check on the quality of my data.

The data quality task runs and I can examine the results:

In addition to creating Rulesets that are attached to tables, I can use them as part of a Glue job. I create my job as usual and then add an Evaluate Data Quality node:

Then I use the Data Quality Definition Language (DDQL) builder to create my rules. I can choose between 20 different rule types:

For this blog post, I made these rules more strict than necessary so that I could show you what happens when the data quality evaluation fails.

I can set the job options, and choose the original data or the data quality results as the output of the transform. I can also write the data quality results to an S3 bucket:

After I have created my Ruleset, I set any other desired options for the job, save it, and then run it. After the job completes I can find the results in the Data quality tab. Because I made some overly strict rules, the evaluation correctly flagged my data with a 0% score:

There’s a lot more, but I will save that for the next blog post!

Things to Know
Preview Regions – This is an open preview and you can access it today the US East (Ohio, N. Virginia), US West (Oregon), Asia Pacific (Tokyo), and Europe (Ireland) AWS Regions.

Pricing – Evaluating data quality consumes Glue Data Processing Units (DPU) in the same manner and at the same per-DPU pricing as any other Glue job.

Jeff;

New – Amazon Redshift Integration with Apache Spark

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-redshift-integration-with-apache-spark/

Apache Spark is an open-source, distributed processing system commonly used for big data workloads. Spark application developers working in Amazon EMR, Amazon SageMaker, and AWS Glue often use third-party Apache Spark connectors that allow them to read and write the data with Amazon Redshift. These third-party connectors are not regularly maintained, supported, or tested with various versions of Spark for production.

Today we are announcing the general availability of Amazon Redshift integration for Apache Spark, which makes it easy to build and run Spark applications on Amazon Redshift and Redshift Serverless, enabling customers to open up the data warehouse for a broader set of AWS analytics and machine learning (ML) solutions.

With Amazon Redshift integration for Apache Spark, you can get started in seconds and effortlessly build Apache Spark applications in a variety of languages, such as Java, Scala, and Python.

Your applications can read from and write to your Amazon Redshift data warehouse without compromising on the performance of the applications or transactional consistency of the data, as well as performance improvements with pushdown optimizations.

Amazon Redshift integration for Apache Spark builds on an existing open source connector project and enhances it for performance and security, helping customers gain up to 10x faster application performance. We thank the original contributors on the project who collaborated with us to make this happen. As we make further enhancements we will continue to contribute back into the open source project.

Getting Started with Spark Connector for Amazon Redshift
To get started, you can go to AWS analytics and ML services, use data frame or Spark SQL code in a Spark job or Notebook to connect to the Amazon Redshift data warehouse, and start running queries in seconds.

In this launch, Amazon EMR 6.9, EMR Serverless, and AWS Glue 4.0 come with the pre-packaged connector and JDBC driver, and you can just start writing code. EMR 6.9 provides a sample notebook, and EMR Serverless provides a sample Spark Job too.

First, you should set AWS Identity and Access Management (AWS IAM) authentication between Redshift and Spark, between Amazon Simple Storage Service (Amazon S3) and Spark, and between Redshift and Amazon S3. The following diagram describes the authentication between Amazon S3, Redshift, the Spark driver, and Spark executors.

For more information, see Identity and access management in Amazon Redshift in the AWS documentation.

Amazon EMR
If you already have an Amazon Redshift data warehouse and the data available, you can create the database user and provide the right level of grants to the database user. To use this with Amazon EMR, you need to upgrade to the latest version of the Amazon EMR 6.9 that has the packaged spark-redshift connector. Select the emr-6.9.0 release when you create an EMR cluster on Amazon EC2.

You can use EMR Serverless to create your Spark application using the emr-6.9.0 release to run your workload.

EMR Studio also provides an example Jupyter Notebook configured to connect to an Amazon Redshift Serverless endpoint leveraging sample data that you can use to get started quickly.

Here is a Scalar example to build your applications both with Spark Dataframe and Spark SQL. Use IAM-based credentials for connecting to Redshift and use IAM role for unloading and loading data from S3.

// Create the JDBC connection URL and define the Redshift context
val jdbcURL = "jdbc:redshift:iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser=<RsUser>"
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir, 
  "aws_iam_role" -> roleARN,
  )
// Reference the sales table from Redshift 
val sales_df = spark
  .read 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
// Reference the date table from Redshift using Data Frame 
sales_df.join(date_df, sales_df("dateid") === date_df("dateid"))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

If Amazon Redshift and Amazon EMR are in different VPCs, you have to configure VPC peering or enable cross-VPC access. Assuming both Amazon Redshift and Amazon EMR are in the same virtual private cloud (VPC), you can create a Spark job or Notebook and connect to the Amazon Redshift data warehouse and write Spark code to use the Amazon Redshift connector.

To learn more, see Use Spark on Amazon Redshift with a connector in the AWS documentation.

AWS Glue
When you use AWS Glue 4.0, the spark-redshift connector is available both as a source and target. In Glue Studio, you can use a visual ETL job to read or write to a Redshift data warehouse simply by selecting a Redshift connection to use within a built-in Redshift source or target node.

The Redshift connection contains Redshift connection details along with the credentials needed to access Redshift with the proper permissions.

To get started, choose Jobs in the left menu of the Glue Studio console. Using either of the Visual modes, you can easily add and edit a source or target node and define a range of transformations on the data without writing any code.

Choose Create and you can easily add and edit a source, target node, and the transform node in the job diagram. At this time, you will choose Amazon Redshift as Source and Target.

Once completed, the Glue job can be executed on Glue for the Apache Spark engine, which will automatically use the latest spark-redshift connector.

The following Python script shows an example job to read and write to Redshift with dynamicframe using the spark-redshift connector.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": "awsuser",
    "password": "Password1",
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": "awsuser"
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

When you set up your job detail, you can only use the Glue 4.0 – Supports spark 3.3 Python 3 version for this integration.

To learn more, see Creating ETL jobs with AWS Glue Studio and Using connectors and connections with AWS Glue Studio in the AWS documentation.

Gaining the Best Performance
In the Amazon Redshift integration for Apache Spark, the Spark connector automatically applies predicate and query pushdown to optimize for performance. You can gain performance improvement by using the default Parquet format for the connector used for unloading with this integration.

As the following sample code shows, the Spark connector will turn the supported function into a SQL query and run the query in Amazon Redshift.

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

// Create temporary views for data frames created earlier so they can be accessed via Spark SQL
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
// Show the total sales on a given date using Spark SQL API
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

Amazon Redshift integration for Apache Spark adds pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from the Redshift data warehouse to the consuming Spark application, thereby improving performance.

Available Now
The Amazon Redshift integration for Apache Spark is now available in all Regions that support Amazon EMR 6.9, AWS Glue 4.0, and Amazon Redshift. You can start using the feature directly from EMR 6.9 and Glue Studio 4.0 with the new Spark 3.3.0 version.

Give it a try, and please send us feedback either in the AWS re:Post for Amazon Redshift or through your usual AWS support contacts.

Channy

Scale AWS SDK for pandas workloads with AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/scale-aws-sdk-for-pandas-workloads-with-aws-glue-for-ray/

AWS SDK for pandas is an open-source library that extends the popular Python pandas library, enabling you to connect to AWS data and analytics services using pandas data frames. We’ve seen customers use the library in combination with pandas for both data engineering and AI workloads. Although pandas data frames are simple to use, they have a limitation on the size of data that can be processed. Because pandas is single-threaded, jobs are bounded by the available resources. If the data you need to process is small, this won’t be a problem, and pandas makes analysis and manipulation simple, as well as interactions with many other tools that support machine learning (ML) and visualization. However, as your data size scales, you may run into problems. This can be especially frustrating if you’ve created a promising prototype that can’t be moved to production. In our work with customers, we’ve seen many projects, both in data science and data engineering, that are stuck while they wait for someone to rewrite using a big data framework such as Apache Spark.

We are excited to announce that AWS SDK for pandas now supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. The simplest way to do this is to use AWS Glue with Ray, the new serverless option to run distributed Python code announced at AWS re:Invent 2022. AWS SDK for pandas also supports self-managed Ray on Amazon Elastic Compute Cloud (Amazon EC2).

In this post, we show you how you can use pandas to connect to AWS data and analytics services and manipulate data at scale by running on an AWS Glue with Ray job.

Overview of solution

Ray is a unified framework that enables you to scale AI and Python applications. The goal of the project is to take any Python code that’s written on a laptop and scale the workload on a cluster. This innovative framework opens the door to big data processing to a new audience. Previously, the only way to process large datasets on a cluster was to use tools such as Apache Hadoop, Apache Spark, or Apache Flink. These frameworks require additional skills because they provide their own programming model and often require languages such as Scala or Java to fully take advantage of the advanced capabilities. With Ray, you can just use Python to parallelize your code with few modifications.

Although Ray opens the door to big data processing, it’s not enough on its own to distribute pandas-specific methods. That task falls to Modin, a drop-in replacement of pandas, optimized to run in a distributed environment, such as Ray. Modin has the same API as pandas, so you can keep your code the same, but it parallelizes workloads to improve performance.

With today’s announcement, AWS SDK for pandas customers can use both Ray and Modin for their workloads. You have the option of loading data into Modin data frames, instead of regular pandas data frames. By configuring the library to use Ray and Modin, your existing data processing scripts can distribute operations end-to-end, with no code changes. AWS SDK for pandas takes care of parallelizing the read and write operations for your files across the compute cluster.

To use this feature, you can install the release candidate version of awswrangler with the ray and modin extras:

pip install "awswrangler[modin,ray]==3.0.0rc2"

Once installed, you can use the library in your code by importing it with the following statement:

import awswrangler as wr

When you run this code, the SDK for pandas looks for an environmental variable called WR_ADDRESS. If it finds it, it uses this value to send the commands to a remote cluster. If it doesn’t find it, it starts a local Ray runtime on your machine.

The following diagram shows what is happening when you run code that uses AWS SDK for pandas to read data from Amazon Simple Storage Service (Amazon S3) into a Modin data frame, perform a filtering operation, and write the data back to Amazon S3, using a multi-node cluster.

In the first phase, each node reads one or more input files and stores them in memory as blocks. During this phase, the head node builds a mapping reference that tracks the location of each block on the worker nodes. In the second phase, a filter operation is submitted to each node, creating a subset of the data. Finally, each worker node writes its blocks to Amazon S3.

It’s important to note that certain data frame operations (for example groupby or join) may result in the data being shuffled across nodes. Shuffling will also happen if you do partitioned or bucketed writes. This tends to slow down the job because data needs to move between nodes.

If you want to create your own Ray cluster on Amazon EC2, refer to the tutorial Distributing Calls on Ray Remote Cluster. The rest of this post shows you how to run AWS SDK for pandas and Modin on an AWS Glue with Ray job.

Use AWS Glue with Ray

Because AWS Glue with Ray is a fully managed environment, it’s a simple way to run jobs. Both AWS SDK for pandas and Modin are pre-loaded, you don’t need to worry about cluster management or installing the right set of dependencies, and the job auto scales with your workload. To get started, complete the following steps:

  1. Choose Launch Stack to provision an AWS CloudFormation stack in your AWS account:
    launch cloudformation stack
    Note that while in preview, AWS Glue with Ray is available in a limited set of AWS Regions.The stack takes about 3 minutes to complete. You can verify that everything was successfully deployed by checking that the CloudFormation stack shows the status CREATE_COMPLETE.
  2. Navigate to AWS Glue Studio to find an AWS Glue job named GlueRayJob with the following script.
  3. Choose Run to start the job and navigate to the Runs tab to monitor progress.

Here, we break down the script and show you what happens at each stage when we run this code on AWS Glue with Ray. First, we import the library:

import awswrangler as wr

At import, AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a Ray cluster with the default parameters. In this case, because we’re running on AWS Glue with Ray, AWS SDK for pandas automatically uses the Ray cluster with no extra configuration needed. Advanced users can override this process, however, by starting the Ray runtime before the import command.

Next, we read Amazon product data in Parquet format from Amazon S3 and load it into a distributed Modin data frame:

# Read Parquet data (1.2 Gb Parquet compressed)
df = wr.s3.read_parquet(
    path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)

Simple data transformations on the data frame are applied next. Modin data frames implement the same interface as pandas data frames, allowing you to perform familiar pandas operations at scale. First, we drop the customer_id column, then we filter for a subset of the reviews that received five-star ratings:

# Drop the customer_id column
df.drop("customer_id", axis=1, inplace=True)

# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]

The data is written back to Amazon S3 in Parquet format, partitioned by year and marketplace. The dataset=True argument ensures that an associated Hive table is also created in the AWS Glue metadata catalog:

# Write partitioned five-star reviews to S3 in Parquet format
wr.s3.to_parquet(
    df5,
    path=f"s3://{bucket_name}/{category}/",
    partition_cols=["year", "marketplace"],
    dataset=True, 
    database=glue_database,
    table=glue_table, 
)

Finally, a query is run in Amazon Athena, and the S3 objects resulting from this operation are read in parallel into a Modin data frame:

# Read the data back to a Modin df via Athena
df5_athena = wr.athena.read_sql_query(
    f"SELECT * FROM {glue_table}",
    database=glue_database,
    ctas_approach=False, 
    unload_approach=True, 
    workgroup=workgroup_name,
    s3_output=f"s3://{bucket_name}/unload/{category}/",
)

The Amazon CloudWatch logs of the job provide insights into the performance achieved from reading blocks in parallel in a multi-node Ray cluster.

For simplicity, this example showcased Amazon S3 and Athena APIs only, but AWS SDK for pandas supports other services, including Amazon Timestream and Amazon Redshift. For a full list of the APIs that support distribution, refer to Supported APIs.

Clean up AWS resources

To prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this example:

  1. On the Amazon S3 console, empty data from both buckets with prefix glue-ray-.
  2. On the AWS CloudFormation console, delete the SDKPandasOnGlueRay stack.

The resources created as part of the stack are automatically deleted with it.

Conclusion

In this post, we demonstrated how you can run your workloads at scale using AWS SDK for pandas. When used in combination with AWS Glue with Ray, this gives you access to a fully managed environment to distribute your Python scripts. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.

For more examples, check out the tutorials in the AWS SDK for pandas documentation.


About the Authors

Abdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton Kukushkin is a Data Engineer for AWS Professional Services based in London, United Kingdom. He works with AWS customers, helping them build and scale their data and analytics.

Leon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale.

Lucas Hanson is Senior Cloud Engineer for AWS Professional Services. He focuses on helping customers with infrastructure management and DevOps processes for data management solutions. Outside of work, he enjoys music production and practicing yoga.

New AWS Glue 4.0 – New and Updated Engines, More Data Formats, and More

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-aws-glue-4-0-new-and-updated-engines-more-data-formats-and-more/

AWS Glue is a scalable, serverless tool that helps you to accelerate the development and execution of your data integration and ETL workloads. Today we are launching Glue 4.0, with updated engines, support for additional data formats, Ray support, and a lot more.

Before I dive in, just a word about versioning. Unlike most AWS services, where the service team owns and has full control over the APIs, Glue includes a collection of libraries, engines, and tools developed by the open source community. Some of these components do not maintain strict backward compatibility, often in pursuit of efficiency. In order to make sure that changes to the components do not impact your Glue jobs, you must select a particular Glue version when you create the job.

Each version of Glue includes performance and reliability benefits in addition to the added features, and you should plan to upgrade your jobs over time to take advantage of all that Glue has to offer.

Dive in to Glue
Let’s take a look at what’s new in Glue 4.0:

Updated Engines – This version of Glue includes Python 3.10 and Apache Spark 3.3.0. Both engines include bug fixes and performance enhancements; Spark includes new features such as row-level runtime filtering, improved error messages, additional built-in functions, and much more. Glue and Amazon EMR make use of the same optimized Spark runtime, which has been optimized to run in the AWS cloud and can be 2-3 times faster than the basic open source version.

New Engine Plugins – Glue 4.0 adds native support for the Cloud Shuffle Service Plugin for Spark to help you scale your disk usage, and Adaptive Query Execution to dynamically optimize your queries as they run.

Pandas Support Pandas is an open source data analysis and manipulation tool that is built on top of Python. It is easy to learn and includes all kinds of interesting and useful data manipulation functions.

New Data Formats – Whether you are building a data lake or a data warehouse, Glue 4.0 now handles new open source data formats for sources and targets, with support for Apache Hudi, Apache Iceberg, and Delta Lake. To learn more about these new options and formats, read Get Started with Apache Hudi using AWS Glue by Implementing Key Design Concepts.

Everything Else – In addition to the above items, Glue 4.0 also includes the Parquet vectorized reader, with support for additional data types and encodings. It has been upgraded to use log4j 2 and is no longer dependent on log4j 1.

Available Now
Glue 4.0 is available today in the US East (Ohio, N. Virginia), US West (N. California, Oregon), Africa (Cape Town), Asia Pacific (Hong Kong, Jakarta, Mumbai, Osaka, Seoul, Singapore, Sydney, Tokyo), Canada (Central), Europe (Frankfurt, Ireland, London, Milan, Paris, Stockholm), Middle East (Bahrain), and South America (Sao Paulo) AWS Regions.

Jeff;