Tag Archives: AWS Glue

Dive deep into AWS Glue 4.0 for Apache Spark

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/dive-deep-into-aws-glue-4-0-for-apache-spark/

Deriving insight from data is hard. It’s even harder when your organization is dealing with silos that impede data access across different data stores. Seamless data integration is a key requirement in a modern data architecture to break down data silos. AWS Glue is a serverless data integration service that makes data preparation simpler, faster, and cheaper. You can discover and connect to over 70 diverse data sources, manage your data in a centralized data catalog, and create, run, and monitor data integration pipelines to load data into your data lakes and your data warehouses. AWS Glue for Apache Spark takes advantage of Apache Spark’s powerful engine to process large data integration jobs at scale.

AWS Glue released version 4.0 at AWS re:Invent 2022, which includes many upgrades, such as the new optimized Apache Spark 3.3.0 runtime (3.5 times performance improvement on average over open-source Apache Spark 3.3.0), Python 3.10, and a new enhanced Amazon Redshift connector (10 times performance improvement on average over the previous version).

In this post, we discuss the main benefits that this new AWS Glue version brings and how it can help you build better data integration pipelines.

Spark upgrade highlights

The new version of Spark included in AWS Glue 4.0 brings a number of valuable features, which we highlight in this section. For more details, refer to Spark Release 3.3.0 and Spark Release 3.2.0.

Support for the pandas API

Support for the pandas API allows users familiar with the popular Python library to start writing distributed extract, transform, and load (ETL) jobs without having to learn a new framework API. We discuss this in more detail later in this post.

Python UDF profiling

With Python UDF profiling, now you can profile regular and pandas user-defined functions (UDFs). Calling show_profiles() on the SparkContext to get details about the Python run was added on Spark 1 for RDD; now it also works for DataFrame Python UDFs and provides valuable information about how many calls to the UDF are made and how much time is spent on it.

For instance, to illustrate how the profiler measures time, the following example is the profile of a pandas UDF that processes over a million rows (but the pandas UDF only needs 112 calls) and sleeps for 1 second. We can use the command spark.sparkContext.show_profiles(), as shown in the following screenshot.

Python UDF Profiling

Pushdown filters

Pushdown filters are used in more scenarios, such as aggregations or limits. The upgrade also offers support for Bloom filters and skew optimization. These improvements allow for handling larger datasets by reading less data and processing it more efficiently. For more information, refer to Spark Release 3.3.0.

ANSI SQL

Now you can ask SparkSQL to follow the ANSI behavior on those points that it traditionally differed from the standard. This helps users bring their existing SQL skills and start producing value on AWS Glue faster. For more information, refer to ANSI Compliance.

Adaptive query execution by default

Adaptive query execution (AQE) by default helps optimize Spark SQL performance. You can turn AQE on and off by using spark.sql.adaptive.enabled as an umbrella configuration. Since AWS Glue 4.0, it’s enabled by default, so you no longer need to enable this explicitly.

Improved error messages

Improved error messages provide better context and easy resolution. For instance, if you have a division by zero in a SQL statement on ANSI mode, previously you would get just a Java exception: java.lang.ArithmeticException: divide by zero. Depending on the complexity and number of queries, the cause might not be obvious and might require some reruns with trial and error until it’s identified.

On the new version, you get a much more revealing error:

Caused by: org.apache.spark.SparkArithmeticException: Division by zero. 
Use `try_divide` to tolerate divisor being 0 and return NULL instead. 
If necessary set "spark.sql.ansi.enabled" to "false" (except for ANSI interval type) to bypass this error.
== SQL(line 1, position 8) ==
select sum(cost)/(select count(1) from items where cost > 100) from items
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Not only does it show the query that caused the issue, but it also indicates the specific operation where the error occurred (the division in this case). In addition, it provides some guidance on resolving the issue.

New pandas API on Spark

The Spark upgrade brings a new, exciting feature, which is the chance to use your existing Python pandas framework knowledge in a distributed and scalable runtime. This lowers the barrier of entry for teams without previous Spark experience, so they can start delivering value quickly and make the most of the AWS Glue for Spark runtime.

The new API provides a pandas DataFrame-compatible API, so you can use existing pandas code and migrate it to AWS Glue for Spark changing the imports, although it’s not 100% compatible.

If you just want to migrate existing pandas code to run on pandas on Spark, you could replace the import and test:

# Replace pure pandas 
import pandas as pd
# With pandas API on Spark
import pyspark.pandas as pd

In some cases, you might want to use multiple implementations on the same script, for instance because a feature is still not available on the pandas API for Spark or the data is so small that some operations are more efficient if done locally rather than distributed. In that situation, to avoid confusion, it’s better to use a different alias for the pandas and the pandas on Spark module imports, and to follow a convention to name the different types of DataFrames, because it has implications in performance and features, for instance, pandas DataFrame variables starting with pdf_, pandas on Spark as psdf_, and standard Spark as sdf_ or just df_.

You can also convert to a standard Spark DataFrame calling to_spark(). This allows you to use features not available on pandas such as writing directly to catalog tables or using some Spark connectors.

The following code shows an example of combining the different types of APIs:

# The job has the parameter "--additional-python-modules":"openpyxl", 
#  the Excel library is not provided by default

import pandas as pd
# pdf is a pure pandas DF which resides in the driver memory only
pdf = pd.read_excel('s3://mybucket/mypath/MyExcel.xlsx', index_col=0)

import pyspark.pandas as ps
# psdf behaves like a pandas df but operations on it will be distributed among nodes
psdf = ps.from_pandas(pdf)
means_series = psdf.mean()

# Convert to a dataframe of column names and means
#  pandas on Spark series don't allow iteration so we convert to pure pandas
#  on a big dataset this could cause the driver to be overwhelmed but not here
# We reset to have the index labels as a columns to use them later
pdf_means = pd.DataFrame(means_series.to_pandas()).reset_index(level=0)
# Set meaningful column names
pdf_means.columns = ["excel_column", "excel_average"]

# We want to use this to enrich a Spark DF representing a huge table
#  convert to standard Spark DF so we can use the start API
sdf_means = ps.from_pandas(pdf_means).to_spark()

sdf_bigtable = spark.table("somecatalog.sometable")
sdf_bigtable.join(sdf_means, sdf_bigtable.category == sdf_means.excel_column)

Improved Amazon Redshift connector

A new version of the Amazon Redshift connector brings many improvements:

  • Pushdown optimizations
  • Support for reading SUPER columns
  • Writing allowed based on column names instead of position
  • An optimized serializer to increase performance when reading from Amazon Redshift
  • Other minor improvements like trimming pre- and post-actions and handling numeric time zone formats

This new Amazon Redshift connector is built on top of an existing open-source connector project and offers further enhancements for performance and security, helping you gain up to 10 times faster application performance. It accelerates AWS Glue jobs when reading from Amazon Redshift, and also enables you to run data-intensive workloads more reliably. For more details, see Moving data to and from Amazon Redshift. To learn more about how to use it, refer to New – Amazon Redshift Integration with Apache Spark.

When you use the new Amazon Redshift connector on an AWS Glue DynamicFrame, use the existing methods: GlueContext.create_data_frame and GlueContext.write_data_frame.

When you use the new Amazon Redshift connector on a Spark DataFrame, use the format io.github.spark_redshift_community.spark.redshift, as shown in the following code snippet:

df = spark.read.format("io.github.spark_redshift_community.spark.redshift") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("tempdir", redshiftTmpDir) \
    .option("user", user) \
    .option("password", password) \
    .option("aws_iam_role", aws_iam_role) \
    .load()

Other upgrades and improvements

The following are updates and improvements in the dependent libraries:

  • Spark 3.3.0-amzn-0
  • Hadoop 3.2.1-amzn-8
  • Hive 2.39-amzn-2
  • Parquet 1.12
  • Log4j 2
  • Python 3.10
  • Arrow 7.0.0
  • Boto3 1.26
  • EMRFS 2.53.0
  • AWS Glue Data Catalog client 3.6.0
  • New versions of the provided JDBC drivers:
    • MySQL 8.0.23
    • PostgreSQL 42.3.6
    • Microsoft SQL Server 9.4.0
    • Oracle 21.7
    • MongoDB 4.7.2
    • Amazon Redshift redshift-jdbc42-2.1.0.9
  • Integrated and upgraded plugins to popular table formats:
    • Iceberg 1.0.0
    • Hudi 0.12.1
    • Delta Lake 2.1.0

To learn more, refer to the appendices in Migrating AWS Glue jobs to AWS Glue version 4.0.

Improved performance

In addition to all the new features, AWS Glue 4.0 brings performance improvements at lower cost. In summary, AWS Glue 4.0 with Amazon Simple Storage Service (Amazon S3) is 2.7 times faster than AWS Glue 3.0, and AWS Glue 4.0 with Amazon Redshift is 7.1 times faster than AWS Glue 3.0. In the following sections, we provide details about AWS Glue 4.0 performance results with Amazon S3 and Amazon Redshift.

Amazon S3

The following chart shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in an S3 bucket in Parquet format, and we used 30 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon S3 had a total job runtime on AWS Glue 4.0 that was 2.7 times faster than that on AWS Glue 3.0. Detailed instructions are explained in the appendix of this post.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 5084.94274 1896.1904
Geometric Mean 14.00217 10.09472

TPC-DS benchmark result with S3

Amazon Redshift

The following chart shows the total job runtime for all queries (in seconds) in the 1 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in a five-node ra3.4xlarge Amazon Redshift cluster, and we used 150 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon Redshift had a total job runtime on AWS Glue 4.0 that was 7.1 times faster than that on AWS Glue 3.0.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 22020.58 3075.96633
Geometric Mean 142.38525 8.69973

TPC-DS benchmark result with Redshift

Get started with AWS Glue 4.0

You can start using AWS Glue 4.0 via AWS Glue Studio, the AWS Glue console, the latest AWS SDK, and the AWS Command Line Interface (AWS CLI).

To start using AWS Glue 4.0 in AWS Glue Studio, open the AWS Glue job and on the Job details tab, choose the version Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.

Glue version 4.0 in Glue Studio

To migrate your existing AWS Glue jobs from AWS Glue 0.9, 1.0, 2.0, and 3.0 to AWS Glue 4.0, see Migrating AWS Glue jobs to AWS Glue version 4.0.

The AWS Glue 4.0 Docker images are now available on Docker Hub, so you can use them to develop locally for the new version. Refer to Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container for further details.

Conclusion

In this post, we discussed the main upgrades provided by the new 4.0 version of AWS Glue. You can already start writing new jobs on that version and benefit from all the improvements, as well as migrate your existing jobs.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team. He’s been an Apache Spark enthusiast since version 0.8. In his spare time, he likes playing board games.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on solving challenging distributed systems problems for data integration on Glue platform for customers using Apache Spark.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for building data lakes on AWS and simplifying integration with data warehouses for customers using Apache Spark.


Appendix: TPC-DS benchmark on AWS Glue against a dataset on Amazon S3

To perform a TPC-DS benchmark on AWS Glue against a dataset in an S3 bucket, you need to copy the TPC-DS dataset into your S3 bucket. These instructions are based on emr-spark-benchmark:

  1. Create a new S3 bucket in your test account if needed. In the following code, replace $YOUR_S3_BUCKET with your S3 bucket name. We suggest you export YOUR_S3_BUCKET as an environment variable:
export YOUR_S3_BUCKET=<Your bucket name>
aws s3 mb s3://$YOUR_S3_BUCKET
  1. Copy the TPC-DS source data as input to your S3 bucket. If it’s not exported as an environment variable, replace $YOUR_S3_BUCKET with your S3 bucket name:
aws s3 sync s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/ s3://$YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned/
  1. Build the benchmark application following the instructions in Steps to build spark-benchmark-assembly application.

For your convenience, we have provided the sample application JAR file spark-benchmark-assembly-3.3.0.jar, which we built for AWS Glue 4.0.

  1. Upload the spark-benchmar-assembly JAR file to your S3 bucket.
  2. In AWS Glue Studio, create a new AWS Glue job through the script editor:
    1. Under Job details, for Type, choose Spark.
    2. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
    3. For Language, choose Scala.
    4. For Worker type, choose your preferred worker type.
    5. For Requested number of workers, choose your preferred number.
    6. Under Advanced properties, for Dependent JARs path, enter your S3 path for the spark-benchmark-assembly JAR file.
    7. For Script, enter the following code snippet:
import com.amazonaws.eks.tpcds.BenchmarkSQL

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    BenchmarkSQL.main(
      Array(
        "s3://YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned",
        "s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/", 
        "/opt/tpcds-kit/tools", 
        "parquet", 
        "3000", 
        "3", 
        "false", 
        "q1-v2.4,q10-v2.4,q11-v2.4,q12-v2.4,q13-v2.4,q14a-v2.4,q14b-v2.4,q15-v2.4,q16-v2.4,q17-v2.4,q18-v2.4,q19-v2.4,q2-v2.4,q20-v2.4,q21-v2.4,q22-v2.4,q23a-v2.4,q23b-v2.4,q24a-v2.4,q24b-v2.4,q25-v2.4,q26-v2.4,q27-v2.4,q28-v2.4,q29-v2.4,q3-v2.4,q30-v2.4,q31-v2.4,q32-v2.4,q33-v2.4,q34-v2.4,q35-v2.4,q36-v2.4,q37-v2.4,q38-v2.4,q39a-v2.4,q39b-v2.4,q4-v2.4,q40-v2.4,q41-v2.4,q42-v2.4,q43-v2.4,q44-v2.4,q45-v2.4,q46-v2.4,q47-v2.4,q48-v2.4,q49-v2.4,q5-v2.4,q50-v2.4,q51-v2.4,q52-v2.4,q53-v2.4,q54-v2.4,q55-v2.4,q56-v2.4,q57-v2.4,q58-v2.4,q59-v2.4,q6-v2.4,q60-v2.4,q61-v2.4,q62-v2.4,q63-v2.4,q64-v2.4,q65-v2.4,q66-v2.4,q67-v2.4,q68-v2.4,q69-v2.4,q7-v2.4,q70-v2.4,q71-v2.4,q72-v2.4,q73-v2.4,q74-v2.4,q75-v2.4,q76-v2.4,q77-v2.4,q78-v2.4,q79-v2.4,q8-v2.4,q80-v2.4,q81-v2.4,q82-v2.4,q83-v2.4,q84-v2.4,q85-v2.4,q86-v2.4,q87-v2.4,q88-v2.4,q89-v2.4,q9-v2.4,q90-v2.4,q91-v2.4,q92-v2.4,q93-v2.4,q94-v2.4,q95-v2.4,q96-v2.4,q97-v2.4,q98-v2.4,q99-v2.4,ss_max-v2.4", 
        "true"
      )
    )
  }
}
  1. Save and run the job.

The result file will be stored under s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/.

Ten new visual transforms in AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/ten-new-visual-transforms-in-aws-glue-studio/

AWS Glue Studio is a graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. It allows you to visually compose data transformation workflows using nodes that represent different data handling steps, which later are converted automatically into code to run.

AWS Glue Studio recently released 10 more visual transforms to allow creating more advanced jobs in a visual way without coding skills. In this post, we discuss potential uses cases that reflect common ETL needs.

The new transforms that will be demonstrated in this post are: Concatenate, Split String, Array To Columns, Add Current Timestamp, Pivot Rows To Columns, Unpivot Columns To Rows, Lookup, Explode Array Or Map Into Columns, Derived Column, and Autobalance Processing.

Solution overview

In this use case, we have some JSON files with stock option operations. We want to make some transformations before storing the data to make it easier to analyze, and we also want to produce a separate dataset summary.

In this dataset, each row represents a trade of option contracts. Options are financial instruments that provide the right—but not the obligation—to buy or sell stock shares at a fixed price (called  strike price) before a defined expiration date.

Input data

The data follows the following schema:

  • order_id – A unique ID
  • symbol – A code generally based on a few letters to identify the corporation that emits the underlying stock shares
  • instrument – The name that identifies the specific option being bought or sold
  • currency – The ISO currency code in which the price is expressed
  • price – The amount that was paid for the purchase of each option contract (on most exchanges, one contract allows you to buy or sell 100 stock shares)
  • exchange – The code of the exchange center or venue where the option was traded
  • sold – A list of the number of contracts that where allocated to fill the sell order when this is a sell trade
  • bought – A list of the number of contracts that where allocated to fill the buy order when this is buy trade

The following is a sample of the synthetic data generated for this post:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL requirements

This data has a number of unique characteristics, as often found on older systems, that make the data harder to use.

The following are the ETL requirements:

  • The instrument name has valuable information that is intended for humans to understand; we want to normalize it into separate columns for easier analysis.
  • The attributes bought and sold are mutually exclusive; we can consolidate them into a single column with the contract numbers and have another column indicating if the contracts where bought or sold in this order.
  • We want to keep the information about the individual contract allocations but as individual rows instead of forcing users to deal with an array of numbers. We could add up the numbers, but we would lose information about how the order was filled (indicating market liquidity). Instead, we choose to denormalize the table so each row has a single number of contracts, splitting orders with multiple numbers into separate rows. In a compressed columnar format, the extra dataset size of this repetition is often small when compression is applied, so it’s acceptable to make the dataset easier to query.
  • We want to generate a summary table of volume for each option type (call and put) for each stock. This provides an indication of the market sentiment for each stock and the market in general (greed vs. fear).
  • To enable overall trade summaries, we want to provide for each operation the grand total and standardize the currency to US dollars, using an approximate conversion reference.
  • We want to add the date when these transformations took place. This could be useful, for instance, to have a reference on when was the currency conversion made.

Based on those requirements, the job will produce two outputs:

  • A CSV file with a summary of the number of contracts for each symbol and type
  • A catalog table to keep a history of the order, after doing the transformations indicated
    Data schema

Prerequisites

You will need your own S3 bucket to follow along with this use case. To create a new bucket, refer to Creating a bucket.

Generate synthetic data

To follow along with this post (or experiment with this kind of data on your own), you can generate this dataset synthetically. The following Python script can be run on a Python environment with Boto3 installed and access to Amazon Simple Storage Service (Amazon S3).

To generate the data, complete the following steps:

  1. On AWS Glue Studio, create a new job with the option Python shell script editor.
  2. Give the job a name and on the Job details tab, select a suitable role and a name for the Python script.
  3. In the Job details section, expand Advanced properties and scroll down to Job parameters.
  4. Enter a parameter named --bucket and assign as the value the name of the bucket you want to use to store the sample data.
  5. Enter the following script into the AWS Glue shell editor:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys
    
    # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket:
        raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated")
    
    data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000
    
    # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO()
    
    sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"),
                     ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file):
        stock = random.choice(sample_stocks)
        symbol = stock[0]
        ref_price = stock[1]
        currency = stock[2]
        strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3))
        sample = {
            "order_id": int(datetime.now().timestamp() * 1000) + i,
            "symbol": stock[0],
            "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}",
            "currency": currency,
            "price": round(random.uniform(0.5, 20.1), 2),
            "exchange": "EDGX" if currency == "usd" else "XETR"
         }
        sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))]
        buff.write(json.dumps(sample).encode())
        buff.write("\n".encode())
    
    s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. Run the job and wait until it shows as successfully completed on the Runs tab (it should take just a few seconds).

Each run will generate a JSON file with 1,000 rows under the bucket specified and prefix transformsblog/inputdata/. You can run the job multiple times if you want to test with more input files.
Each line in the synthetic data is a data row representing a JSON object like the following:

{
 "order_id":1681986991888,
 "symbol":"AMZN",
 "instrument":"AMZN APR 28 23 100 PUT",
 "currency":"usd",
 "price":2.89,
 "exchange":"EDGX",
 "sold":[88,49]
}

Create the AWS Glue visual job

To create the AWS Glue visual job, complete the following steps:

  1. Go to AWS Glue Studio and create a job using the option Visual with a blank canvas.
  2. Edit Untitled job to give it a name and assign a role suitable for AWS Glue on the Job details tab.
  3. Add an S3 data source (you can name it JSON files source) and enter the S3 URL under which the files are stored (for example, s3://<your bucket name>/transformsblog/inputdata/), then select JSON as the data format.
  4. Select Infer schema so it sets the output schema based on the data.

From this source node, you’ll keep chaining transforms. When adding each transform, make sure the selected node is the last one added so it gets assigned as the parent, unless indicated otherwise in the instructions.

If you didn’t select the right parent, you can always edit the parent by selecting it and choosing another parent in the configuration pane.

Node parent configuration

For each node added, you’ll give it a specific name (so the node purpose shows in the graph) and configuration on the Transform tab.

Every time a transform changes the schema (for instance, add a new column), the output schema needs to be updated so it’s visible to the downstream transforms. You can manually edit the output schema, but it’s more practical and safer to do it using the data preview.
Additionally, that way you can verify the transformation are working so far as expected. To do so, open the Data preview tab with the transform selected and start a preview session. After you have verified the transformed data looks as expected, go to the Output schema tab and choose Use data preview schema to update the schema automatically.

As you add new kinds of transforms, the preview might show a message about a missing dependency. When this happens, choose End Session and the start a new one, so the preview picks up the new kind of node.

Extract instrument information

Let’s start by dealing with the information on the instrument name to normalize it into columns that are easier to access in the resulting output table.

  1. Add a Split String node and name it Split instrument, which will tokenize the instrument column using a whitespace regex: \s+ (a single space would do in this case, but this way is more flexible and visually clearer).
  2. We want to keep the original instrument information as is, so enter a new column name for the split array: instrument_arr.
    Split config
  3. Add an Array To Columns node and name it Instrument columns to convert the array column just created into new fields, except for symbol, for which we already have a column.
  4. Select the column instrument_arr, skip the first token and tell it to extract the output columns month, day, year, strike_price, type using indexes 2, 3, 4, 5, 6 (the spaces after the commas are for readability, they don’t impact the configuration).
    Array config

The year extracted is expressed with two digits only; let’s put a stopgap to assume it’s in this century if they just use two digits.

  1. Add a Derived Column node and name it Four digits year.
  2. Enter year as the derived column so it overrides it, and enter the following SQL expression:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    Year derived column config

For convenience, we build an expiration_date field that a user can have as reference of the last date the option can be exercised.

  1. Add a Concatenate Columns node and name it Build expiration date.
  2. Name the new column expiration_date, select the columns year, month, and day (in that order), and a hyphen as spacer.
    Concatenated date config

The diagram so far should look like the following example.

DAG

The data preview of the new columns so far should look like the following screenshot.

Data preview

Normalize the number of contracts

Each of the rows in the data indicates the number of contracts of each option that were bought or sold and the batches on which the orders were filled. Without losing the information about the individual batches, we want to have each amount on an individual row with a single amount value, while the rest of the information is replicated in each row produced.

First, let’s merge the amounts into a single column.

  1. Add an Unpivot Columns Into Rows node and name it Unpivot actions.
  2. Choose the columns bought and sold to unpivot and store the names and values in columns named action and contracts, respectively.
    Unpivot config
    Notice in the preview that the new column contracts is still an array of numbers after this transformation.
  1. Add an Explode Array Or Map into Rows row named Explode contracts.
  2. Choose the contracts column and enter contracts as the new column to override it (we don’t need to keep the original array).

The preview now shows that each row has a single contracts amount, and the rest of the fields are the same.

This also means that order_id is no longer a unique key. For your own use cases, you need to decide how to model your data and if you want to denormalize or not.
Explode config

The following screenshot is an example of what the new columns look like after the transformations so far.
Data preview

Create a summary table

Now you create a summary table with the number of contracts traded for each type and each stock symbol.

Let’s assume for illustration purposes that the files processed belong to a single day, so this summary gives the business users information about what the market interest and sentiment are that day.

  1. Add a Select Fields node and select the following columns to keep for the summary: symbol, type, and contracts.
    Selected fields
  2. Add a Pivot Rows Into Columns node and name it Pivot summary.
  3. Aggregate on the contracts column using sum and choose to convert the type column.
    Pivot config

Normally, you would store it on some external database or file for reference; in this example, we save it as a CSV file on Amazon S3.

  1. Add an Autobalance Processing node and name it Single output file.
  2. Although that transform type is normally used to optimize the parallelism, here we use it to reduce the output to a single file. Therefore, enter 1 in the number of partitions configuration.
    Autobalance config
  3. Add an S3 target and name it CSV Contract summary.
  4. Choose CSV as the data format and enter an S3 path where the job role is allowed to store files.

The last part of the job should now look like the following example.
DAG

  1. Save and run the job. Use the Runs tab to check when it has finished successfully.
    You’ll find a file under that path that is a CSV, despite not having that extension. You’ll probably need to add the extension after downloading it to open it.
    On a tool that can read the CSV, the summary should look something like the following example.
    Spreadsheet

Clean up temporary columns

In preparation for saving the orders into a historical table for future analysis, let’s clean up some temporary columns created along the way.

  1. Add a Drop Fields node with the Explode contracts node selected as its parent (we are branching the data pipeline to generate a separate output).
  2. Select the fields to be dropped: instrument_arr, month, day, and year.
    The rest we want to keep so they are saved in the historical table we’ll create later.
    Drop fields

Currency standardization

This synthetic data contains fictional operations on two currencies, but in a real system you could get currencies from markets all over the world. It’s useful to standardize the currencies handled into a single reference currency so they can be easily be compared and aggregated for reporting and analysis.

We use Amazon Athena to simulate a table with approximate currency conversions that gets updated periodically (here we assume we process the orders timely enough that the conversion is a reasonable representative for comparison purposes).

  1. Open the Athena console in the same Region where you’re using AWS Glue.
  2. Run the following query to create the table by setting an S3 location where both your Athena and AWS Glue roles can read and write. Also, you might want to store the table in a different database than default (if you do that, update the table qualified name accordingly in the examples provided).
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. Enter a few sample conversions into the table:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. You should now be able to view the table with the following query:
    SELECT * FROM default.exchange_rates
  5. Back on the AWS Glue visual job, add a Lookup node (as a child of Drop Fields) and name it Exchange rate.
  6. Enter the qualitied name of the table you just created, using currency as the key and select the exchange_rate field to use.
    Because the field is named the same in both the data and the lookup table, we can just enter the name currency and don’t need to define a mapping.Lookup config
    At the time of this writing, the Lookup transform is not supported in the data preview and it will show an error that the table doesn’t exist. This is only for the data preview and doesn’t prevent the job from running correctly. The few remaining steps of the post don’t require you to update the schema. If you need to run a data preview on other nodes, you can remove the lookup node temporarily and then put it back.
  7. Add a Derived Column node and name it Total in usd.
  8. Name the derived column total_usd and use the following SQL expression:
    round(contracts * price * exchange_rate, 2)
    Currency conversion config
  9. Add a Add Current Timestamp node and name the column ingest_date.
  10. Use the format %Y-%m-%d for your timestamp (for demonstration purposes, we are just using the date; you can make it more precise if you want to).
    Timestamp config

Save the historical orders table

To save the historical orders table, complete the following steps:

  1. Add an S3 target node and name it Orders table.
  2. Configure Parquet format with snappy compression, and provide an S3 target path under which to store the results (separate from the summary).
  3. Select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  4. Enter a target database and a name for the new table, for instance: option_orders.
    Table sink config

The last part of the diagram should now look similar to the following, with two branches for the two separate outputs.
DAG

After you run the job successfully, you can use a tool like Athena to review the data the job has produced by querying the new table. You can find the table on the Athena list and choose Preview table or just run a SELECT query (updating the table name to the name and catalog you used):

SELECT * FROM default.option_orders limit 10

Your table content should look similar to the following screenshot.
Table content

Clean up

If you don’t want to keep this example, delete the two jobs you created, the two tables in Athena, and the S3 paths where the input and output files were stored.

Conclusion

In this post, we showed how the new transforms in AWS Glue Studio can help you do more advanced transformation with minimum configuration. This means you can implement more ETL uses cases without having to write and maintain any code. The new transforms are already available on AWS Glue Studio, so you can use the new transforms today in your visual jobs.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Scale your AWS Glue for Apache Spark jobs with new larger worker types G.4X and G.8X

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/scale-your-aws-glue-for-apache-spark-jobs-with-new-larger-worker-types-g-4x-and-g-8x/

Hundreds of thousands of customers use AWS Glue, a serverless data integration service, to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue for Apache Spark jobs work with your code and configuration of the number of data processing units (DPU). Each DPU provides 4 vCPU, 16 GB memory, and 64 GB disk. AWS Glue manages running Spark and adjusts workers to achieve the best price performance. For workloads such as data transforms, joins, and queries, you can use G.1X (1 DPU) and G.2X (2 DPU) workers, which offer a scalable and cost-effective way to run most jobs. With exponentially growing data sources and data lakes, customers want to run more data integration workloads, including their most demanding transforms, aggregations, joins, and queries. These workloads require higher compute, memory, and storage per worker.

Today we are pleased to announce the general availability of AWS Glue G.4X (4 DPU) and G.8X (8 DPU) workers, the next series of AWS Glue workers for the most demanding data integration workloads. G.4X and G.8X workers offer increased compute, memory, and storage, making it possible for you to vertically scale and run intensive data integration jobs, such as memory-intensive data transforms, skewed aggregations, and entity detection checks involving petabytes of data. Larger worker types not only benefit the Spark executors, but also in cases where the Spark driver needs larger capacity—for instance, because the job query plan is quite large.

This post demonstrates how AWS Glue G.4X and G.8X workers help you scale your AWS Glue for Apache Spark jobs.

G.4X and G.8X workers

AWS Glue G.4X and G.8X workers give you more compute, memory, and storage to run your most demanding jobs. G.4X workers provide 4 DPU, with 16 vCPU, 64 GB memory, and 256 GB of disk per node. G.8X workers provide 8 DPU, with 32 vCPU, 128 GB memory, and 512 GB of disk per node. You can enable G.4X and G.8X workers with a single parameter change in the API, AWS Command Line Interface (AWS CLI), or visually in AWS Glue Studio. Regardless of the worker used, all AWS Glue jobs have the same capabilities, including auto scaling and interactive job authoring via notebooks. G.4X and G.8X workers are available with AWS Glue 3.0 and 4.0.

The following table shows compute, memory, disk, and Spark configurations per worker type in AWS Glue 3.0 or later.

AWS Glue Worker Type DPU per Node vCPU Memory (GB) Disk (GB) Number of Spark Executors per Node Number of Cores per Spark Executor
G.1X 1 4 16 64 1 4
G.2X 2 8 32 128 1 8
G.4X (new) 4 16 64 256 1 16
G.8X (new) 8 32 128 512 1 32

To use G.4X and G.8X workers on an AWS Glue job, change the setting of the worker type parameter to G.4X or G.8X. In AWS Glue Studio, you can choose G 4X or G 8X under Worker type.

In the AWS API or AWS SDK, you can specify G.4X or G.8X in the WorkerType parameter. In the AWS CLI, you can use the --worker-type parameter in a create-job command.

To use G.4X and G.8X on an AWS Glue Studio notebook or interactive sessions, set G.4X or G.8X in the %worker_type magic:

Performance characteristics using the TPC-DS benchmark

In this section, we use the TPC-DS benchmark to showcase performance characteristics of the new G.4X and G.8X worker types. We used AWS Glue version 4.0 jobs.

G.2X, G.4X, and G.8X results with the same number of workers

Compared to the G.2X worker type, the G.4X worker has 2 times the DPUs and the G.8X worker has 4 times the DPUs. We ran over 100 TPC-DS queries against the 3 TB TPC-DS dataset with the same number of workers but on different worker types. The following table shows the results of the benchmark.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 30 60 537.4 $236.46
G.4X 30 120 264.6 $232.85
G.8X 30 240 122.6 $215.78

When running jobs on the same number of workers, the new G.4X and G.8x workers achieved roughly linear vertical scalability.

G.2X, G.4X, and G.8X results with the same number of DPUs

We ran over 100 TPC-DS queries against the 10 TB TPC-DS dataset with the same number of DPUs but on different worker types. The following table shows the results of the experiments.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 40 80 1323 $776.16
G.4X 20 80 1191 $698.72
G.8X 10 80 1190 $698.13

When running jobs on the same number of total DPUs, the job performance stayed mostly the same with new worker types.

Example: Memory-intensive transformations

Data transformations are an essential step to preprocess and structure your data into an optimal form. Bigger memory footprints are consumed in some transformations such as aggregation, join, your own custom logic using user-defined functions (UDFs), and so on. The new G.4X and G.8X workers enable you to run larger memory-intensive transformations at scale.

The following example reads large JSON files compressed in GZIP from an input Amazon Simple Storage Service (Amazon S3) location, performs groupBy, calculates groups based on K-means clustering using a Pandas UDF, then shows the results. Note that this UDF-based K-means is used just for illustration purposes; it’s recommended to use native K-means clustering for production purposes.

With G.2X workers

When an AWS Glue job runs on 12 G.2X workers (24 DPU), it failed due to a No space left on device error. On the Spark UI, the Stages tab for the failed stage shows that there were multiple failed tasks in the AWS Glue job due to the error.

The Executor tab shows failed tasks per executor.

Generally, G.2X workers can process memory-intensive workload well. This time, we used a special Pandas UDF that consumes a significant amount of memory, and it caused a failure due to a large amount of shuffle writes.

With G.8X workers

When an AWS Glue job runs on 3 G.8X workers (24 DPU), it succeeded without any failures, as shown on the Spark UI’s Jobs tab.

The Executors tab also explains that there were no failed tasks.

From this result, we observed that G.8X workers processed the same workload without failures.

Conclusion

In this post, we demonstrated how AWS Glue G.4X and G.8X workers can help you vertically scale your AWS Glue for Apache Spark jobs. G.4X and G.8X workers are available today in US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). You can start using the new G.4X and G.8X worker types to scale your workload from today. To get started with AWS Glue, visit AWS Glue.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Tomohiro Tanaka is a Senior Cloud Support Engineer on the AWS Support team. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he enjoys coffee breaks with his colleagues and making coffee at home.

Chuhan LiuChuhan Liu is a Software Development Engineer on the AWS Glue team. He is passionate about building scalable distributed systems for big data processing, analytics, and management. In his spare time, he enjoys playing tennis.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytic services. In his spare time, he enjoys skiing and gardening.

Process price transparency data using AWS Glue

Post Syndicated from Hari Thatavarthy original https://aws.amazon.com/blogs/big-data/process-price-transparency-data-using-aws-glue/

The Transparency in Coverage rule is a federal regulation in the United States that was finalized by the Center for Medicare and Medicaid Services (CMS) in October 2020. The rule requires health insurers to provide clear and concise information to consumers about their health plan benefits, including costs and coverage details. Under the rule, health insurers must make available to their members a list of negotiated rates for in-network providers, as well as an estimate of the member’s out-of-pocket costs for specific health care services. This information must be made available to members through an online tool that is accessible and easy to use. The Transparency in Coverage rule also requires insurers to make available data files that contain detailed information on the prices they negotiate with health care providers. This information can be used by employers, researchers, and others to compare prices across different insurers and health care providers. Phase 1 implementation of this regulation, which went into effect on July 1, 2022, requires that payors publish machine-readable files publicly for each plan that they offer. CMS (Center for Medicare and Medicaid Services) has published a technical implementation guide with file formats, file structure, and standards on producing these machine-readable files.

This post walks you through the preprocessing and processing steps required to prepare data published by health insurers in light of this federal regulation using AWS Glue. We also show how to query and derive insights using Amazon Athena.

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data.

Challenges processing these machine-readable files

The machine-readable files published by these payors vary in size. A single file can range from a few megabytes to hundreds of gigabytes. These files contain large JSON objects that are deeply nested. Unlike NDJSON and JSONL formats, where each line in the file is a JSON object, these files contain a single large JSON object that can span across multiple lines. The following figure represents the schema of an in_network rate file published by a major health insurer on their website for public access. This file, when uncompressed, is about 20 GB in size, contains a single JSON object, and is deeply nested. The following figure represents the schema of this JSON object when printed using the Spark printSchema() function. Each highlighted box in red is a nested array structure.

JSON Schema

Loading a 20 GB deeply nested JSON object requires a machine with a large memory footprint. Data when loaded into memory is 4–10 times its size on disk. A 20 GB JSON object may need a machine with up to 200 GB memory. To process workloads larger than 20 GB, these machines need to be scaled vertically, thereby significantly increasing hardware costs. Vertical scaling has its limits, and it’s not possible to scale beyond a certain point. Analyzing this data requires unnesting and flattening of deeply nested array structures. These transformations explode the data at an exponential rate, thereby adding to the need for more memory and disk space.

You can use an in-memory distributed processing framework such as Apache Spark to process and analyze such large volumes of data. However, to load this single large JSON object as a Spark DataFrame and perform an action on it, a worker node needs enough memory to load this object in full. When a worker node tries to load this large deeply nested JSON object and there isn’t enough memory to load it in full, the processing job will fail with out-of-memory issues. This calls for splitting the large JSON object into smaller chunks using some form of preprocessing logic. Once preprocessed, these smaller files can then be further processed in parallel by worker nodes without running into out-of-memory issues.

Solution overview

The solution involves a two-step approach. The first is a preprocessing step, which takes the large JSON object as input and splits it to multiple manageable chunks. This is required to address the challenges we mentioned earlier. The second is a processing step, which prepares and publishes data for analysis.

The preprocessing step uses an AWS Glue Python shell job to split the large JSON object into smaller JSON files. The processing step unnests and flattens the array items from these smaller JSON files in parallel. It then partitions and writes the output as Parquet on Amazon Simple Storage Service (Amazon S3). The partitioned data is cataloged and analyzed using Athena. The following diagram illustrates this workflow.

Solution Overview

Prerequisites

To implement the solution in your own AWS account, you need to create or configure the following AWS resources in advance:

  • An S3 bucket to persist the source and processed data. Download the input file and upload it to the path s3://yourbucket/ptd/2023-03-01_United-HealthCare-Services—Inc-_Third-Party-Administrator_PS1-50_C2_in-network-rates.json.gz.
  • An AWS Identity and Access Management (IAM) role for your AWS Glue extract, transform, and load (ETL) job. For instructions, refer to Setting up IAM permissions for AWS Glue. Adjust the permissions to ensure AWS Glue has read/write access to Amazon S3 locations.
  • An IAM role for Athena with AWS Glue Data Catalog permissions to create and query tables.

Create an AWS Glue preprocessing job

The preprocessing step uses ijson, an open-source iterative JSON parser to extract items in the outermost array of top-level attributes. By streaming and iteratively parsing the large JSON file, the preprocessing step loads only a portion of the file into memory, thereby avoiding out-of-memory issues. It also uses s3pathlib, an open-source Python interface to Amazon S3. This makes it easy to work with S3 file systems.

To create and run the AWS Glue job for preprocessing, complete the following steps:

  1. On the AWS Glue console, choose Jobs under Glue Studio in the navigation pane.
  2. Create a new job.
  3. Select Python shell script editor.
  4. Select Create a new script with boilerplate code.
    Python Shell Script Editor
  5. Enter the following code into the editor (adjust the S3 bucket names and paths to point to the input and output locations in Amazon S3):
import ijson
import json
import decimal
from s3pathlib import S3Path
from s3pathlib import context
import boto3
from io import StringIO

class JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, decimal.Decimal):
            return float(obj)
        return json.JSONEncoder.default(self, obj)
        
def upload_to_s3(data, upload_path):
    data = bytes(StringIO(json.dumps(data,cls=JSONEncoder)).getvalue(),encoding='utf-8')
    s3_client.put_object(Body=data, Bucket=bucket, Key=upload_path)
        
s3_client = boto3.client('s3')

#Replace with your bucket and path to JSON object on your bucket
bucket = 'yourbucket'
largefile_key = 'ptd/2023-03-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_PS1-50_C2_in-network-rates.json.gz'
p = S3Path(bucket, largefile_key)


#Replace the paths to suit your needs
upload_path_base = 'ptd/preprocessed/base/base.json'
upload_path_in_network = 'ptd/preprocessed/in_network/'
upload_path_provider_references = 'ptd/preprocessed/provider_references/'

#Extract top the values of the following top level attributes and persist them on your S3 bucket
# -- reporting_entity_name
# -- reporting_entity_type
# -- last_updated_on
# -- version

base ={
    'reporting_entity_name' : '',
    'reporting_entity_type' : '',
    'last_updated_on' :'',
    'version' : ''
}

with p.open("r") as f:
    obj = ijson.items(f, 'reporting_entity_name')
    for evt in obj:
        base['reporting_entity_name'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f, 'reporting_entity_type')
    for evt in obj:
        base['reporting_entity_type'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f, 'last_updated_on')
    for evt in obj:
        base['last_updated_on'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f,'version')
    for evt in obj:
        base['version'] = evt
        break
        
upload_to_s3(base,upload_path_base)

#Seek the position of JSON key provider_references 
#Iterate through items in provider_references array, and for every 1000 items create a JSON file on S3 bucket
with p.open("r") as f:
    provider_references = ijson.items(f, 'provider_references.item')
    fk = 0
    lst = []
    for rowcnt,row in enumerate(provider_references):
        if rowcnt % 1000 == 0:
            if fk > 0:
                dest = upload_path_provider_references + path
                upload_to_s3(lst,dest)
            lst = []
            path = 'provider_references_{0}.json'.format(fk)
            fk = fk + 1

        lst.append(row)

    path = 'provider_references_{0}.json'.format(fk)
    dest = upload_path_provider_references + path
    upload_to_s3(lst,dest)
    
#Seek the position of JSON key in_network
#Iterate through items in in_network array, and for every 25 items create a JSON file on S3 bucket
with p.open("r") as f:
    in_network = ijson.items(f, 'in_network.item')
    fk = 0
    lst = []
    for rowcnt,row in enumerate(in_network):
        if rowcnt % 25 == 0:
            if fk > 0:
                dest = upload_path_in_network + path
                upload_to_s3(lst,dest)
            lst = []
            path = 'in_network_{0}.json'.format(fk)
            fk = fk + 1

        lst.append(row)


    path = 'in_network_{0}.json'.format(fk)
    dest = upload_path_in_network + path
    upload_to_s3(lst,dest)
  1. Update the properties of your job on the Job details tab:
    1. For Type, choose Python Shell.
    2. For Python version, choose Python 3.9.
    3. For Data processing units, choose 1 DPU.

For Python shell jobs, you can allocate either 0.0625 or 1 DPU. The default is 0.0625 DPU. A DPU is a relative measure of processing power that consists of 4 vCPUs of compute capacity and 16 GB of memory.

python shell job config

The Python libraries ijson and s3pathlib are available in pip and can be installed using the AWS Glue job parameter --additional-python-modules. You can also choose to package these libraries, upload them to Amazon S3, and refer to them from your AWS Glue job. For instructions on packaging your library, refer to Providing your own Python library.

  1. To install the Python libraries, set the following job parameters:
    • Key--additional-python-modules
    • Valueijson,s3pathlibinstall python modules
  2. Run the job.

The preprocessing step creates three folders in the S3 bucket: base, in_network and provider_references.

s3_folder_1

Files in in_network and provider_references folders contains array of JSON objects. Each of these JSON objects represents an element in the outermost array of the original large JSON object.

s3_folder_2

Create an AWS Glue processing job

The processing job uses the output of the preprocessing step to create a denormalized view of data by extracting and flattening elements and attributes from nested arrays. The extent of unnesting depends on the attributes we need for analysis. For example, attributes such as negotiated_rate, npi, and billing_code are essential for analysis and extracting values associated with these attributes requires multiple levels of unnesting. The denormalized data is then partitioned by the billing_code column, persisted as Parquet on Amazon S3, and registered as a table on the AWS Glue Data Catalog for querying.

The following code sample guides you through the implementation using PySpark. The columns used to partition the data depends on query patterns used to analyze the data. Arriving at a partitioning strategy that is in line with the query patterns will improve overall query performance during analysis. This post assumes that the queries used for analyzing data will always use the column billing_code to filter and fetch data of interest. Data in each partition is bucketed by npi to improve query performance.

To create your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, choose Jobs under Glue Studio in the navigation pane.
  2. Create a new job.
  3. Select Spark script editor.
  4. Select Create a new script with boilerplate code.
  5. Enter the following code into the editor (adjust the S3 bucket names and paths to point to the input and output locations in Amazon S3):
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import explode

#create a dataframe of base objects - reporting_entity_name, reporting_entity_type, version, last_updated_on
#using the output of preprocessing step

base_df = spark.read.json('s3://yourbucket/ptd/preprocessed/base/')

#create a dataframe over provider_references objects using the output of preprocessing step
prvd_df = spark.read.json('s3://yourbucket/ptd/preprocessed/provider_references/')

#cross join dataframe of base objects with dataframe of provider_references 
prvd_df = prvd_df.crossJoin(base_df)

#create a dataframe over in_network objects using the output of preprocessing step
in_ntwrk_df = spark.read.json('s3://yourbucket/ptd/preprocessed/in_network/')

#unnest and flatten negotiated_rates and provider_references from in_network objects
in_ntwrk_df2 = in_ntwrk_df.select(
 in_ntwrk_df.billing_code, in_ntwrk_df.billing_code_type, in_ntwrk_df.billing_code_type_version,
 in_ntwrk_df.covered_services, in_ntwrk_df.description, in_ntwrk_df.name,
 explode(in_ntwrk_df.negotiated_rates).alias('exploded_negotiated_rates'),
 in_ntwrk_df.negotiation_arrangement)


in_ntwrk_df3 = in_ntwrk_df2.select(
 in_ntwrk_df2.billing_code, in_ntwrk_df2.billing_code_type, in_ntwrk_df2.billing_code_type_version,
 in_ntwrk_df2.covered_services, in_ntwrk_df2.description, in_ntwrk_df2.name,
 in_ntwrk_df2.exploded_negotiated_rates.negotiated_prices.alias(
 'exploded_negotiated_rates_negotiated_prices'),
 explode(in_ntwrk_df2.exploded_negotiated_rates.provider_references).alias(
 'exploded_negotiated_rates_provider_references'),
 in_ntwrk_df2.negotiation_arrangement)

#join the exploded in_network dataframe with provider_references dataframe
jdf = prvd_df.join(
 in_ntwrk_df3,
 prvd_df.provider_group_id == in_ntwrk_df3.exploded_negotiated_rates_provider_references,"fullouter")

#un-nest and flatten attributes from rest of the nested arrays.
jdf2 = jdf.select(
 jdf.reporting_entity_name,jdf.reporting_entity_type,jdf.last_updated_on,jdf.version,
 jdf.provider_group_id, jdf.provider_groups, jdf.billing_code,
 jdf.billing_code_type, jdf.billing_code_type_version, jdf.covered_services,
 jdf.description, jdf.name,
 explode(jdf.exploded_negotiated_rates_negotiated_prices).alias(
 'exploded_negotiated_rates_negotiated_prices'),
 jdf.exploded_negotiated_rates_provider_references,
 jdf.negotiation_arrangement)

jdf3 = jdf2.select(
 jdf2.reporting_entity_name,jdf2.reporting_entity_type,jdf2.last_updated_on,jdf2.version,
 jdf2.provider_group_id,
 explode(jdf2.provider_groups).alias('exploded_provider_groups'),
 jdf2.billing_code, jdf2.billing_code_type, jdf2.billing_code_type_version,
 jdf2.covered_services, jdf2.description, jdf2.name,
 jdf2.exploded_negotiated_rates_negotiated_prices.additional_information.
 alias('additional_information'),
 jdf2.exploded_negotiated_rates_negotiated_prices.billing_class.alias(
 'billing_class'),
 jdf2.exploded_negotiated_rates_negotiated_prices.billing_code_modifier.
 alias('billing_code_modifier'),
 jdf2.exploded_negotiated_rates_negotiated_prices.expiration_date.alias(
 'expiration_date'),
 jdf2.exploded_negotiated_rates_negotiated_prices.negotiated_rate.alias(
 'negotiated_rate'),
 jdf2.exploded_negotiated_rates_negotiated_prices.negotiated_type.alias(
 'negotiated_type'),
 jdf2.exploded_negotiated_rates_negotiated_prices.service_code.alias(
 'service_code'), jdf2.exploded_negotiated_rates_provider_references,
 jdf2.negotiation_arrangement)

jdf4 = jdf3.select(jdf3.reporting_entity_name,jdf3.reporting_entity_type,jdf3.last_updated_on,jdf3.version,
 jdf3.provider_group_id,
 explode(jdf3.exploded_provider_groups.npi).alias('npi'),
 jdf3.exploded_provider_groups.tin.type.alias('tin_type'),
 jdf3.exploded_provider_groups.tin.value.alias('tin'),
 jdf3.billing_code, jdf3.billing_code_type,
 jdf3.billing_code_type_version, jdf3.covered_services,
 jdf3.description, jdf3.name, jdf3.additional_information,
 jdf3.billing_class, jdf3.billing_code_modifier,
 jdf3.expiration_date, jdf3.negotiated_rate,
 jdf3.negotiated_type, jdf3.service_code,
 jdf3.negotiation_arrangement)

#repartition by billing_code. 
#Repartition changes the distribution of data on spark cluster. 
#By repartition data we will avoid writing too many small files.
jdf5=jdf4.repartition("billing_code")
 
datasink_path = "s3://yourbucket/ptd/processed/billing_code_npi/parquet/"

#persist dataframe as parquet on S3 and catalog it
#Partition the data by billing_code. This enables analytical queries to skip data and improve performance of queries
#Data is also bucketed and sorted npi to improve query performance during analysis

jdf5.write.format('parquet').mode("overwrite").partitionBy('billing_code').bucketBy(2, 'npi').sortBy('npi').saveAsTable('ptdtable', path = datasink_path)
  1. Update the properties of your job on the Job details tab:
    1. For Type, choose Spark.
    2. For Glue version, choose Glue 4.0.
    3. For Language, choose Python 3.
    4. For Worker type, choose G 2X.
    5. For Requested number of workers, enter 20.

Arriving at the number of workers and worker type to use for your processing job depends on factors such as the amount of data being processed, the speed at which it needs to be processed, and the partitioning strategy used. Repartitioning of data can result in out-of-memory issues, especially when data is heavily skewed on the column used to repartition. It’s possible to reach Amazon S3 service limits if too many workers are assigned to the job. This is because tasks running on these worker nodes may try to read/write from the same S3 prefix, causing Amazon S3 to throttle the incoming requests. For more details, refer to Best practices design patterns: optimizing Amazon S3 performance.

processing job config

Exploding array elements creates new rows and columns, thereby exponentially increasing the amount of data that needs to be processed. Apache Spark splits this data into multiple Spark partitions on different worker nodes so that it can process large amounts of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. Shuffle operations are commonly triggered by wide transformations such as join, reduceByKey, groupByKey, and repartition. In case of exceptions due to local storage limitations, it helps to supplement or replace local disk storage capacity with Amazon S3 for large shuffle operations. This is possible with the AWS Glue Spark shuffle plugin with Amazon S3. With the cloud shuffle storage plugin for Apache Spark, you can avoid disk space-related failures.

  1. To use the Spark shuffle plugin, set the following job parameters:
    • Key--write-shuffle-files-to-s3
    • Valuetrue
      spark shuffle plugin

Query the data

You can query the cataloged data using Athena. For instructions on setting up Athena, refer to Setting up.

On the Athena console, choose Query editor in the navigation pane to run your query, and specify your data source and database.

sql query

To find the minimum, maximum, and average negotiated rates for procedure codes, run the following query:

SELECT
billing_code,
round(min(negotiated_rate),2) as min_price,
round(avg(negotiated_rate),2) as avg_price,
round(max(negotiated_rate),2) as max_price,
description
FROM "default"."ptdtable"
group by billing_code, description
limit 10;

The following screenshot shows the query results.

sql query results

Clean up

To avoid incurring future charges, delete the AWS resources you created:

  1. Delete the S3 objects and bucket.
  2. Delete the IAM policies and roles.
  3. Delete the AWS Glue jobs for preprocessing and processing.

Conclusion

This post guided you through the necessary preprocessing and processing steps to query and analyze price transparency-related machine-readable files. Although it’s possible to use other AWS services to process such data, this post focused on preparing and publishing data using AWS Glue.

To learn more about the Transparency in Coverage rule, refer to Transparency in Coverage. For best practices for scaling Apache Spark jobs and partitioning data with AWS Glue, refer to Best practices to scale Apache Spark jobs and partition data with AWS Glue. To learn how to monitor AWS Glue jobs, refer to Monitoring AWS Glue Spark jobs.

We look forward to hearing any feedback or questions.


About the Authors

hari thatavarthyHari Thatavarthy is a Senior Solutions Architect on the AWS Data Lab team. He helps customers design and build solutions in the data and analytics space. He believes in data democratization and loves to solve complex data processing-related problems. In his spare time, he loves to play table tennis.

Krishna MaddiletiKrishna Maddileti is a Senior Solutions Architect on the AWS Data Lab team. He partners with customers on their AWS journey and helps them with data engineering, data lakes, and analytics. In his spare time, he enjoys spending time with his family and playing video games with his 7-year-old.

yadukishore tatavartiYadukishore Tatavarthi is a Senior Partner Solutions Architect at AWS. He works closely with global system integrator partners to enable and support customers moving their workloads to AWS.

Manish KolaManish Kola is a Solutions Architect on the AWS Data Lab team. He partners with customers on their AWS journey.

Noritaki SakayamiNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Compose your ETL jobs for MongoDB Atlas with AWS Glue

Post Syndicated from Igor Alekseev original https://aws.amazon.com/blogs/big-data/compose-your-etl-jobs-for-mongodb-atlas-with-aws-glue/

In today’s data-driven business environment, organizations face the challenge of efficiently preparing and transforming large amounts of data for analytics and data science purposes. Businesses need to build data warehouses and data lakes based on operational data. This is driven by the need to centralize and integrate data coming from disparate sources.

At the same time, operational data often originates from applications backed by legacy data stores. Modernizing applications requires a microservice architecture, which in turn necessitates the consolidation of data from multiple sources to construct an operational data store. Without modernization, legacy applications may incur increasing maintenance costs. Modernizing applications involves changing the underlying database engine to a modern document-based database like MongoDB.

These two tasks (building data lakes or data warehouses and application modernization) involve data movement, which uses an extract, transform, and load (ETL) process. The ETL job is a key functionality to having a well-structured process in order to succeed.

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. MongoDB Atlas is an integrated suite of cloud database and data services that combines transactional processing, relevance-based search, real-time analytics, and mobile-to-cloud data synchronization in an elegant and integrated architecture.

By using AWS Glue with MongoDB Atlas, organizations can streamline their ETL processes. With its fully managed, scalable, and secure database solution, MongoDB Atlas provides a flexible and reliable environment for storing and managing operational data. Together, AWS Glue ETL and MongoDB Atlas are a powerful solution for organizations looking to optimize how they build data lakes and data warehouses, and to modernize their applications, in order to improve business performance, reduce costs, and drive growth and success.

In this post, we demonstrate how to migrate data from Amazon Simple Storage Service (Amazon S3) buckets to MongoDB Atlas using AWS Glue ETL, and how to extract data from MongoDB Atlas into an Amazon S3-based data lake.

Solution overview

In this post, we explore the following use cases:

  • Extracting data from MongoDB – MongoDB is a popular database used by thousands of customers to store application data at scale. Enterprise customers can centralize and integrate data coming from multiple data stores by building data lakes and data warehouses. This process involves extracting data from the operational data stores. When the data is in one place, customers can quickly use it for business intelligence needs or for ML.
  • Ingesting data into MongoDB – MongoDB also serves as a no-SQL database to store application data and build operational data stores. Modernizing applications often involves migration of the operational store to MongoDB. Customers would need to extract existing data from relational databases or from flat files. Mobile and web apps often require data engineers to build data pipelines to create a single view of data in Atlas while ingesting data from multiple siloed sources. During this migration, they would need to join different databases to create documents. This complex join operation would need significant, one-time compute power. Developers would also need to build this quickly to migrate the data.

AWS Glue comes handy in these cases with the pay-as-you-go model and its ability to run complex transformations across huge datasets. Developers can use AWS Glue Studio to efficiently create such data pipelines.

The following diagram shows the data extraction workflow from MongoDB Atlas into an S3 bucket using the AWS Glue Studio.

Extracting Data from MongoDB Atlas into Amazon S3

In order to implement this architecture, you will need a MongoDB Atlas cluster, an S3 bucket, and an AWS Identity and Access Management (IAM) role for AWS Glue. To configure these resources, refer to the prerequisite steps in the following GitHub repo.

The following figure shows the data load workflow from an S3 bucket into MongoDB Atlas using AWS Glue.

Loading Data from Amazon S3 into MongoDB Atlas

The same prerequisites are needed here: an S3 bucket, IAM role, and a MongoDB Atlas cluster.

Load data from Amazon S3 to MongoDB Atlas using AWS Glue

The following steps describe how to load data from the S3 bucket into MongoDB Atlas using an AWS Glue job. The extraction process from MongoDB Atlas to Amazon S3 is very similar, with the exception of the script being used. We call out the differences between the two processes.

  1. Create a free cluster in MongoDB Atlas.
  2. Upload the sample JSON file to your S3 bucket.
  3. Create a new AWS Glue Studio job with the Spark script editor option.

Glue Studio Job Creation UI

  1. Depending on whether you want to load or extract data from the MongoDB Atlas cluster, enter the load script or extract script in the AWS Glue Studio script editor.

The following screenshot shows a code snippet for loading data into the MongoDB Atlas cluster.

Code snippet for loading data into MongoDB Atlas

The code uses AWS Secrets Manager to retrieve the MongoDB Atlas cluster name, user name, and password. Then, it creates a DynamicFrame for the S3 bucket and file name passed to the script as parameters. The code retrieves the database and collection names from the job parameters configuration. Finally, the code writes the DynamicFrame to the MongoDB Atlas cluster using the retrieved parameters.

  1. Create an IAM role with the permissions as shown in the following screenshot.

For more details, refer to Configure an IAM role for your ETL job.

IAM Role permissions

  1. Give the job a name and supply the IAM role created in the previous step on the Job details tab.
  2. You can leave the rest of the parameters as default, as shown in the following screenshots.
    Job DetailsJob details continued
  3. Next, define the job parameters that the script uses and supply the default values.
    Job input parameters
  4. Save the job and run it.
  5. To confirm a successful run, observe the contents of the MongoDB Atlas database collection if loading the data, or the S3 bucket if you were performing an extract.

The following screenshot shows the results of a successful data load from an Amazon S3 bucket into the MongoDB Atlas cluster. The data is now available for queries in the MongoDB Atlas UI.
Data Loaded into MongoDB Atlas Cluster

  1. To troubleshoot your runs, review the Amazon CloudWatch logs using the link on the job’s Run tab.

The following screenshot shows that the job ran successfully, with additional details such as links to the CloudWatch logs.

Successful job run details

Conclusion

In this post, we described how to extract and ingest data to MongoDB Atlas using AWS Glue.

With AWS Glue ETL jobs, we can now transfer the data from MongoDB Atlas to AWS Glue-compatible sources, and vice versa. You can also extend the solution to build analytics using AWS AI and ML services.

To learn more, refer to the GitHub repository for step-by-step instructions and sample code. You can procure MongoDB Atlas on AWS Marketplace.


About the Authors

Igor Alekseev is a Senior Partner Solution Architect at AWS in Data and Analytics domain. In his role Igor is working with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect he implemented many projects in Big Data domain, including several data lakes in Hadoop ecosystem. As a Data Engineer he was involved in applying AI/ML to fraud detection and office automation.


Babu Srinivasan
is a Senior Partner Solutions Architect at MongoDB. In his current role, he is working with AWS to build the technical integrations and reference architectures for the AWS and MongoDB solutions. He has more than two decades of experience in Database and Cloud technologies . He is passionate about providing technical solutions to customers working with multiple Global System Integrators(GSIs) across multiple geographies.

Monitor and optimize cost on AWS Glue for Apache Spark

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/monitor-optimize-cost-glue-spark/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

One of the most common questions we get from customers is how to effectively monitor and optimize costs on AWS Glue for Spark. The diversity of features and pricing options for AWS Glue offers the flexibility to effectively manage the cost of your data workloads and still keep the performance and capacity as per your business needs. Although the fundamental process of cost optimization for AWS Glue workloads remains the same, you can monitor job runs and analyze the costs and usage to find savings and take action to implement improvements to the code or configurations.

In this post, we demonstrate a tactical approach to help you manage and reduce cost through monitoring and optimization techniques on top of your AWS Glue workloads.

Monitor overall costs on AWS Glue for Apache Spark

AWS Glue for Apache Spark charges an hourly rate in 1-second increments with a minimum of 1 minute based on the number of data processing units (DPUs). Learn more in AWS Glue Pricing. This section describes a way to monitor overall costs on AWS Glue for Apache Spark.

AWS Cost Explorer

In AWS Cost Explorer, you can see overall trends of DPU hours. Complete the following steps:

  1. On the Cost Explorer console, create a new cost and usage report.
  2. For Service, choose Glue.
  3. For Usage type, choose the following options:
    1. Choose <Region>-ETL-DPU-Hour (DPU-Hour) for standard jobs.
    2. Choose <Region>-ETL-Flex-DPU-Hour (DPU-Hour) for Flex jobs.
    3. Choose <Region>-GlueInteractiveSession-DPU-Hour (DPU-Hour) for interactive sessions.
  4. Choose Apply.

Cost Explorer for Glue usage

Learn more in Analyzing your costs with AWS Cost Explorer.

Monitor individual job run costs

This section describes a way to monitor individual job run costs on AWS Glue for Apache Spark. There are two options to achieve this.

AWS Glue Studio Monitoring page

On the Monitoring page in AWS Glue Studio, you can monitor the DPU hours you spent on a specific job run. The following screenshot shows three job runs that processed the same dataset; the first job run spent 0.66 DPU hours, and the second spent 0.44 DPU hours. The third one with Flex spent only 0.33 DPU hours.

Glue Studio Job Run Monitoring

GetJobRun and GetJobRuns APIs

The DPU hour values per job run can be retrieved through AWS APIs.

For auto scaling jobs and Flex jobs, the field DPUSeconds is available in GetJobRun and GetJobRuns API responses:

$ aws glue get-job-run --job-name ghcn --run-id jr_ccf6c31cc32184cea60b63b15c72035e31e62296846bad11cd1894d785f671f4
{
    "JobRun": {
        "Id": "jr_ccf6c31cc32184cea60b63b15c72035e31e62296846bad11cd1894d785f671f4",
        "Attempt": 0,
        "JobName": "ghcn",
        "StartedOn": "2023-02-08T19:14:53.821000+09:00",
        "LastModifiedOn": "2023-02-08T19:19:35.995000+09:00",
        "CompletedOn": "2023-02-08T19:19:35.995000+09:00",
        "JobRunState": "SUCCEEDED",
        "PredecessorRuns": [],
        "AllocatedCapacity": 10,
        "ExecutionTime": 274,
        "Timeout": 2880,
        "MaxCapacity": 10.0,
        "WorkerType": "G.1X",
        "NumberOfWorkers": 10,
        "LogGroupName": "/aws-glue/jobs",
        "GlueVersion": "3.0",
        "ExecutionClass": "FLEX",
        "DPUSeconds": 1137.0
    }
}

The field DPUSeconds returns 1137.0. This means 0.32 DPU hours which can be calculated in 1137.0/(60*60)=0.32.

For the other standard jobs without auto scaling, the field DPUSeconds is not available:

$ aws glue get-job-run --job-name ghcn --run-id jr_10dfa93fcbfdd997dd9492187584b07d305275531ff87b10b47f92c0c3bd6264
{
    "JobRun": {
        "Id": "jr_10dfa93fcbfdd997dd9492187584b07d305275531ff87b10b47f92c0c3bd6264",
        "Attempt": 0,
        "JobName": "ghcn",
        "StartedOn": "2023-02-07T16:38:05.155000+09:00",
        "LastModifiedOn": "2023-02-07T16:40:48.575000+09:00",
        "CompletedOn": "2023-02-07T16:40:48.575000+09:00",
        "JobRunState": "SUCCEEDED",
        "PredecessorRuns": [],
        "AllocatedCapacity": 10,
        "ExecutionTime": 157,
        "Timeout": 2880,
        "MaxCapacity": 10.0,
        "WorkerType": "G.1X",
        "NumberOfWorkers": 10,
        "LogGroupName": "/aws-glue/jobs",
        "GlueVersion": "3.0",
        "ExecutionClass": "STANDARD"
    }
}

For these jobs, you can calculate DPU hours by ExecutionTime*MaxCapacity/(60*60). Then you get 0.44 DPU hour by 157*10/(60*60)=0.44. Note that AWS Glue versions 2.0 and later have a 1-minute minimum billing.

AWS CloudFormation template

Because DPU hours can be retrieved through the GetJobRun and GetJobRuns APIs, you can integrate this with other services like Amazon CloudWatch to monitor trends of consumed DPU hours over time. For example, you can configure an Amazon EventBridge rule to invoke an AWS Lambda function to publish CloudWatch metrics every time AWS Glue jobs finish.

To help you configure that quickly, we provide an AWS CloudFormation template. You can review and customize it to suit your needs. Some of the resources this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. Choose Next.
  5. On the next page, choose Next.
  6. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  7. Choose Create stack.

Stack creation can take up to 3 minutes.

After you complete the stack creation, when AWS Glue jobs finish, the following DPUHours metrics are published under the Glue namespace in CloudWatch:

  • Aggregated metrics – Dimension=[JobType, GlueVersion, ExecutionClass]
  • Per-job metrics – Dimension=[JobName, JobRunId=ALL]
  • Per-job run metrics – Dimension=[JobName, JobRunId]

Aggregated metrics and per-job metrics are shown as in the following screenshot.

CloudWatch DPUHours Metrics

Each datapoint represents DPUHours per individual job run, so valid statistics for the CloudWatch metrics is SUM. With the CloudWatch metrics, you can have a granular view on DPU hours.

Options to optimize cost

This section describes key options to optimize costs on AWS Glue for Apache Spark:

  • Upgrade to the latest version
  • Auto scaling
  • Flex
  • Set the job’s timeout period appropriately
  • Interactive sessions
  • Smaller worker type for streaming jobs

We dive deep to the individual options.

Upgrade to the latest version

Having AWS Glue jobs running on the latest version enables you to take advantage of the latest functionalities and improvements offered by AWS Glue and the upgraded version of the supported engines such as Apache Spark. For example, AWS Glue 4.0 includes the new optimized Apache Spark 3.3.0 runtime and adds support for built-in pandas APIs as well as native support for Apache Hudi, Apache Iceberg, and Delta Lake formats, giving you more options for analyzing and storing your data. It also includes a new highly performant Amazon Redshift connector that is 10 times faster on TPC-DS benchmarking.

Auto scaling

One of the most common challenges to reduce cost is to identify the right amount of resources to run jobs. Users tend to overprovision workers in order to avoid resource-related problems, but part of those DPUs are not used, which increases costs unnecessarily. Starting with AWS Glue version 3.0, AWS Glue auto scaling helps you dynamically scale resources up and down based on the workload, for both batch and streaming jobs. Auto scaling reduces the need to optimize the number of workers to avoid over-provisioning resources for jobs, or paying for idle workers.

To enable auto scaling on AWS Glue Studio, go to the Job Details tab of your AWS Glue job and select Automatically scale number of workers.

Glue Auto Scaling

You can learn more in Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark.

Flex

For non-urgent data integration workloads that don’t require fast job start times or can afford to rerun the jobs in case of a failure, Flex could be a good option. The start times and runtimes of jobs using Flex vary because spare compute resources aren’t always available instantly and may be reclaimed during the run of a job. Flex-based jobs offer the same capabilities, including access to custom connectors, a visual job authoring experience, and a job scheduling system. With the Flex option, you can optimize the costs of your data integration workloads by up to 34%.

To enable Flex on AWS Glue Studio, go to the Job Details tab of your job and select Flex execution.

Glue Flex

You can learn more in Introducing AWS Glue Flex jobs: Cost savings on ETL workloads.

Interactive sessions

One common practice among developers that create AWS Glue jobs is to run the same job several times every time a modification is made to the code. However, this may not be cost-effective depending of the number of workers assigned to the job and the number of times that it’s run. Also, this approach may slow down the development time because you have to wait until every job run is complete. To address this issue, in 2022 we released AWS Glue interactive sessions. This feature let developers process data interactively using a Jupyter-based notebook or IDE of their choice. Sessions start in seconds and have built-in cost management. As with AWS Glue jobs, you pay for only the resources you use. Interactive sessions allow developers to test their code line by line without needing to run the entire job to test any changes made to the code.

Set the job’s timeout period appropriately

Due to configuration issues, script coding errors, or data anomalies, sometimes AWS Glue jobs can take an exceptionally long time or struggle to process the data, and it can cause unexpected charges. AWS Glue gives you the ability to set a timeout value on any jobs. By default, an AWS Glue job is configured with 48 hours as the timeout value, but you can specify any timeout. We recommend identifying the average runtime of your job, and based on that, set an appropriate timeout period. This way, you can control cost per job run, prevent unexpected charges, and detect any problems related to the job earlier.

To change the timeout value on AWS Glue Studio, go to the Job Details tab of your job and enter a value for Job timeout.

Glue job timeout

Interactive sessions also have the same ability to set an idle timeout value on sessions. The default idle timeout value for Spark ETL sessions is 2880 minutes (48 hours). To change the timeout value, you can use %idle_timeout magic.

Smaller worker type for streaming jobs

Processing data in real time is a common use case for customers, but sometimes these streams have sporadic and low data volumes. G.1X and G.2X worker types could be too big for these workloads, especially if we consider streaming jobs may need to run 24/7. To help you reduce costs, in 2022 we released G.025X, a new quarter DPU worker type for streaming ETL jobs. With this new worker type, you can process low data volume streams at one-fourth of the cost.

To select the G.025X worker type on AWS Glue Studio, go to the Job Details tab of your job. For Type, choose Spark Streaming, then choose G 0.25X for Worker type.

Glue smaller worker

You can learn more in Best practices to optimize cost and performance for AWS Glue streaming ETL jobs.

Performance tuning to optimize cost

Performance tuning plays an important role in reducing cost. The first action for performance tuning is to identify the bottlenecks. Without measuring the performance and identifying bottlenecks, it’s not realistic to optimize cost-effectively. CloudWatch metrics provide a simple view for quick analysis, and the Spark UI provides deeper view for performance tuning. It’s highly recommended to enable Spark UI for your jobs and then view the UI to identify the bottleneck.

The following are high-level strategies to optimize costs:

  • Scale cluster capacity
  • Reduce the amount of data scanned
  • Parallelize tasks
  • Optimize shuffles
  • Overcome data skew
  • Accelerate query planning

For this post, we discuss the techniques for reducing the amount of data scanned and parallelizing tasks.

Reduce the amount of data scanned: Enable job bookmarks

AWS Glue job bookmarks are a capability to process data incrementally when running a job multiple times on a scheduled interval. If your use case is an incremental data load, you can enable job bookmarks to avoid a full scan for all job runs and process only the delta from the last job run. This reduces the amount of data scanned and accelerates individual job runs.

Reduce the amount of data scanned: Partition pruning

If your input data is partitioned in advance, you can reduce the amount of data scan by pruning partitions.

For AWS Glue DynamicFrame, set push_down_predicate (and catalogPartitionPredicate), as shown in the following code. Learn more in Managing partitions for ETL output in AWS Glue.

# DynamicFrame
dyf = Glue_context.create_dynamic_frame.from_catalog(
    database=src_database_name,
    table_name=src_table_name,
    push_down_predicate = "year='2023' and month ='03'",
)

For Spark DataFrame (or Spark SQL), set a where or filter clause to prune partitions:

# DataFrame
df = spark.read.format("json").load("s3://<YourBucket>/year=2023/month=03/*/*.gz")
 
# SparkSQL 
df = spark.sql("SELECT * FROM <Table> WHERE year= '2023' and month = '03'")

Parallelize tasks: Parallelize JDBC reads

The number of concurrent reads from the JDBC source is determined by configuration. Note that by default, a single JDBC connection will read all the data from the source through a SELECT query.

Both AWS Glue DynamicFrame and Spark DataFrame support parallelize data scans across multiple tasks by splitting the dataset.

For AWS Glue DynamicFrame, set hashfield or hashexpression and hashpartition. Learn more in Reading from JDBC tables in parallel.

For Spark DataFrame, set numPartitions, partitionColumn, lowerBound, and upperBound. Learn more in JDBC To Other Databases.

Conclusion

In this post, we discussed methodologies for monitoring and optimizing cost on AWS Glue for Apache Spark. With these techniques, you can effectively monitor and optimize costs on AWS Glue for Spark.

If you have comments or feedback, please leave them in the comments.


About the Authors

Leonardo Gómez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

How the BMW Group analyses semiconductor demand with AWS Glue

Post Syndicated from Göksel SARIKAYA original https://aws.amazon.com/blogs/big-data/how-the-bmw-group-analyses-semiconductor-demand-with-aws-glue/

This is a guest post co-written by Maik Leuthold and Nick Harmening from BMW Group.

The BMW Group is headquartered in Munich, Germany, where the company oversees 149,000 employees and manufactures cars and motorcycles in over 30 production sites across 15 countries. This multinational production strategy follows an even more international and extensive supplier network.

Like many automobile companies across the world, the BMW Group has been facing challenges in its supply chain due to the worldwide semiconductor shortage. Creating transparency about BMW Group’s current and future demand of semiconductors is one key strategic aspect to resolve shortages together with suppliers and semiconductor manufacturers. The manufacturers need to know BMW Group’s exact current and future semiconductor volume information, which will effectively help steer the available worldwide supply.

The main requirement is to have an automated, transparent, and long-term semiconductor demand forecast. Additionally, this forecasting system needs to provide data enrichment steps including byproducts, serve as the master data around the semiconductor management, and enable further use cases at the BMW Group.

To enable this use case, we used the BMW Group’s cloud-native data platform called the Cloud Data Hub. In 2019, the BMW Group decided to re-architect and move its on-premises data lake to the AWS Cloud to enable data-driven innovation while scaling with the dynamic needs of the organization. The Cloud Data Hub processes and combines anonymized data from vehicle sensors and other sources across the enterprise to make it easily accessible for internal teams creating customer-facing and internal applications. To learn more about the Cloud Data Hub, refer to BMW Group Uses AWS-Based Data Lake to Unlock the Power of Data.

In this post, we share how the BMW Group analyzes semiconductor demand using AWS Glue.

Logic and systems behind the demand forecast

The first step towards the demand forecast is the identification of semiconductor-relevant components of a vehicle type. Each component is described by a unique part number, which serves as a key in all systems to identify this component. A component can be a headlight or a steering wheel, for example.

For historic reasons, the required data for this aggregation step is siloed and represented differently in diverse systems. Because each source system and data type have its own schema and format, it’s particularly difficult to perform analytics based on this data. Some source systems are already available in the Cloud Data Hub (for example, part master data), therefore it’s straightforward to consume from our AWS account. To access the remaining data sources, we need to build specific ingest jobs that read data from the respective system.

The following diagram illustrates the approach.

The data enrichment starts with an Oracle Database (Software Parts) that contains part numbers that are related to software. This can be the control unit of a headlight or a camera system for automated driving. Because semiconductors are the basis for running software, this database builds the foundation of our data processing.

In the next step, we use REST APIs (Part Relations) to enrich the data with further attributes. This includes how parts are related (for example, a specific control unit that will be installed into a headlight) and over which timespan a part number will be built into a vehicle. The knowledge about the part relations is essential to understand how a specific semiconductor, in this case the control unit, is relevant for a more general part, the headlight. The temporal information about the use of part numbers allows us to filter out outdated part numbers, which will not be used in the future and therefore have no relevance in the forecast.

The data (Part Master Data) can directly be consumed from the Cloud Data Hub. This database includes attributes about the status and material types of a part number. This information is required to filter out part numbers that we gathered in the previous steps but have no relevance for semiconductors. With the information that was gathered from the APIs, this data is also queried to extract further part numbers that weren’t ingested in the previous steps.

After data enrichment and filtering, a third-party system reads the filtered part data and enriches the semiconductor information. Subsequently, it adds the volume information of the components. Finally, it provides the overall semiconductor demand forecast centrally to the Cloud Data Hub.

Applied services

Our solution uses the serverless services AWS Glue and Amazon Simple Storage Service (Amazon S3) to run ETL (extract, transform, and load) workflows without managing an infrastructure. It also reduces the costs by paying only for the time jobs are running. The serverless approach fits our workflow’s schedule very well because we run the workload only once a week.

Because we’re using diverse data source systems as well as complex processing and aggregation, it’s important to decouple ETL jobs. This allows us to process each data source independently. We also split the data transformation into several modules (Data Aggregation, Data Filtering, and Data Preparation) to make the system more transparent and easier to maintain. This approach also helps in case of extending or modifying existing jobs.

Although each module is specific to a data source or a particular data transformation, we utilize reusable blocks inside of every job. This allows us to unify each type of operation and simplifies the procedure of adding new data sources and transformation steps in the future.

In our setup, we follow the security best practice of the least privilege principle, to ensure the information is protected from accidental or unnecessary access. Therefore, each module has AWS Identity and Access Management (IAM) roles with only the necessary permissions, namely access to only data sources and buckets the job deals with. For more information regarding security best practices, refer to Security best practices in IAM.

Solution overview

The following diagram shows the overall workflow where several AWS Glue jobs are interacting with each other sequentially.

As we mentioned earlier, we used the Cloud Data Hub, Oracle DB, and other data sources that we communicate with via the REST API. The first step of the solution is the Data Source Ingest module, which ingests the data from different data sources. For that purpose, AWS Glue jobs read information from different data sources and writes into the S3 source buckets. Ingested data is stored in the encrypted buckets, and keys are managed by AWS Key Management Service (AWS KMS).

After the Data Source Ingest step, intermediate jobs aggregate and enrich the tables with other data sources like components version and categories, model manufacture dates, and so on. Then they write them into the intermediate buckets in the Data Aggregation module, creating comprehensive and abundant data representation. Additionally, according to the business logic workflow, the Data Filtering and Data Preparation modules create the final master data table with only actual and production-relevant information.

The AWS Glue workflow manages all these ingestion jobs and filtering jobs end to end. An AWS Glue workflow schedule is configured weekly to run the workflow on Wednesdays. While the workflow is running, each job writes execution logs (info or error) into Amazon Simple Notification Service (Amazon SNS) and Amazon CloudWatch for monitoring purposes. Amazon SNS forwards the execution results to the monitoring tools, such as Mail, Teams, or Slack channels. In case of any error in the jobs, Amazon SNS also alerts the listeners about the job execution result to take action.

As the last step of the solution, the third-party system reads the master table from the prepared data bucket via Amazon Athena. After further data engineering steps like semiconductor information enrichment and volume information integration, the final master data asset is written into the Cloud Data Hub. With the data now provided in the Cloud Data Hub, other use cases can use this semiconductor master data without building several interfaces to different source systems.

Business outcome

The project results provide the BMW Group a substantial transparency about their semiconductor demand for their entire vehicle portfolio in the present and in the future. The creation of a database with that magnitude enables the BMW Group to establish even further use cases to the benefit of more supply chain transparency and clearer and deeper exchange with first-tier suppliers and semiconductor manufacturers. It helps not only to resolve the current demanding market situation, but also to be more resilient in the future. Therefore, it’s one major step to a digital, transparent supply chain.

Conclusion

This post describes how to analyze semiconductor demand from many data sources with big data jobs in an AWS Glue workflow. A serverless architecture with minimal diversity of services makes the code base and architecture simple to understand and maintain. To learn more about how to use AWS Glue workflows and jobs for serverless orchestration, visit the AWS Glue service page.


About the authors

Maik Leuthold is a Project Lead at the BMW Group for advanced analytics in the business field of supply chain and procurement, and leads the digitalization strategy for the semiconductor management.

Nick Harmening is an IT Project Lead at the BMW Group and an AWS certified Solutions Architect. He builds and operates cloud-native applications with a focus on data engineering and machine learning.

Göksel Sarikaya is a Senior Cloud Application Architect at AWS Professional Services. He enables customers to design scalable, cost-effective, and competitive applications through the innovative production of the AWS platform. He helps them to accelerate customer and partner business outcomes during their digital transformation journey.

Alexander Tselikov is a Data Architect at AWS Professional Services who is passionate about helping customers to build scalable data, analytics and ML solutions to enable timely insights and make critical business decisions.

Rahul Shaurya is a Senior Big Data Architect at Amazon Web Services. He helps and works closely with customers building data platforms and analytical applications on AWS. Outside of work, Rahul loves taking long walks with his dog Barney.

Cross-account integration between SaaS platforms using Amazon AppFlow

Post Syndicated from Ramakant Joshi original https://aws.amazon.com/blogs/big-data/cross-account-integration-between-saas-platforms-using-amazon-appflow/

Implementing an effective data sharing strategy that satisfies compliance and regulatory requirements is complex. Customers often need to share data between disparate software as a service (SaaS) platforms within their organization or across organizations. On many occasions, they need to apply business logic to the data received from the source SaaS platform before pushing it to the target SaaS platform.

Let’s take an example. AnyCompany’s marketing team hosted an event at the Anaheim Convention Center, CA. The marketing team created leads based on the event in Adobe Marketo. An automated process downloaded the leads from Marketo in the marketing AWS account. These leads are then pushed to the sales AWS account. A business process picks up those leads, filters them based on a “Do Not Call” criteria, and creates entries in the Salesforce system. Now, the sales team can pursue those leads and continue to track the opportunities in Salesforce.

In this post, we show how to share your data across SaaS platforms in a cross-account structure using fully managed, low-code AWS services such as Amazon AppFlow, Amazon EventBridge, AWS Step Functions, and AWS Glue.

Solution overview

Considering our example of AnyCompany, let’s look at the data flow. AnyCompany’s Marketo instance is integrated with the producer AWS account. As the leads from Marketo land in the producer AWS account, they’re pushed to the consumer AWS account, which is integrated to Salesforce. Business logic is applied to the leads data in the consumer AWS account, and then the curated data is loaded into Salesforce.

We have used a serverless architecture to implement this use case. The following AWS services are used for data ingestion, processing, and load:

  • Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, SAP, Marketo, Slack, and ServiceNow, and AWS services like Amazon S3 and Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities like filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow is used to download leads data from Marketo and upload the curated leads data into Salesforce.
  • Amazon EventBridge is a serverless event bus that lets you receive, filter, transform, route, and deliver events. EventBridge is used to track the events like receiving the leads data in the producer or consumer AWS accounts and then triggering a workflow.
  • AWS Step Functions is a visual workflow service that helps developers use AWS services to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning (ML) pipelines. Step Functions is used to orchestrate the data processing.
  • AWS Glue is a serverless data preparation service that makes it easy to run extract, transform, and load (ETL) jobs. An AWS Glue job encapsulates a script that reads, processes, and then writes data to a new schema. This solution uses Python 3.6 AWS Glue jobs for data filtration and processing.
  • Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. Amazon S3 is used to store the leads data.

Let’s review the architecture in detail. The following diagram shows a visual representation of how this integration works.

The following steps outline the process for transferring and processing leads data using Amazon AppFlow, Amazon S3, EventBridge, Step Functions, AWS Glue, and Salesforce:

  1. Amazon AppFlow runs on a daily schedule and retrieves any new leads created within the last 24 hours (incremental changes) from Marketo.
  2. The leads are saved as Parquet format files in an S3 bucket in the producer account.
  3. When the daily flow is complete, Amazon AppFlow emits events to EventBridge.
  4. EventBridge triggers Step Functions.
  5. Step Functions copies the Parquet format files containing the leads from the producer account’s S3 bucket to the consumer account’s S3 bucket.
  6. Upon a successful file transfer, Step Functions publishes an event in the consumer account’s EventBridge.
  7. An EventBridge rule intercepts this event and triggers Step Functions in the consumer account.
  8. Step Functions calls an AWS Glue crawler, which scans the leads Parquet files and creates a table in the AWS Glue Data Catalog.
  9. The AWS Glue job is called, which selects records with the Do Not Call field set to false from the leads files, and creates a new set of curated Parquet files. We have used an AWS Glue job for the ETL pipeline to showcase how you can use purpose-built analytics service for complex ETL needs. However, for simple filtering requirements like Do Not Call, you can use the existing filtering feature of Amazon AppFlow.
  10. Step Functions then calls Amazon AppFlow.
  11. Finally, Amazon AppFlow populates the Salesforce leads based on the data in the curated Parquet files.

We have provided artifacts in this post to deploy the AWS services in your account and try out the solution.

Prerequisites

To follow the deployment walkthrough, you need two AWS accounts, one for the producer and other for the consumer. Use us-east-1 or us-west-2 as your AWS Region.

Consumer account setup:

Stage the data

To prepare the data, complete the following steps:

  1. Download the zipped archive file to use for this solution and unzip the files locally.

The AWS Glue job uses the glue-job.py script to perform ETL and populates the curated table in the Data Catalog.

  1. Create an S3 bucket called consumer-configbucket-<ACCOUNT_ID> via the Amazon S3 console in the consumer account, where ACCOUNT_ID is your AWS account ID.
  2. Upload the script to this location.

Create a connection to Salesforce

Follow the connection setup steps outlined in here. Please make a note of the Salesforce connector name.

Create a connection to Salesforce in the consumer account

Follow the connection setup steps outlined in Create Opportunity Object Flow.

Set up resources with AWS CloudFormation

We provided two AWS CloudFormation templates to create resources: one for the producer account, and one for the consumer account.

Amazon S3 now applies server-side encryption with Amazon S3 managed keys (SSE-S3) as the base level of encryption for every bucket in Amazon S3. Starting January 5, 2023, all new object uploads to Amazon S3 are automatically encrypted at no additional cost and with no impact on performance. We use this default encryption for both producer and consumer S3 buckets. If you choose to bring your own keys with AWS Key Management Service (AWS KMS), we recommend referring to Replicating objects created with server-side encryption (SSE-C, SSE-S3, SSE-KMS) for cross-account replication.

Launch the CloudFormation stack in the consumer account

Let’s start with creating resources in the consumer account. There are a few dependencies on the consumer account resources from the producer account. To launch the CloudFormation stack in the consumer account, complete the following steps:

  1. Sign in to the consumer account’s AWS CloudFormation console in the target Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-consumer.
  5. Enter the parameters for the connector name, object, and producer (source) account ID.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • ConsumerS3Bucketconsumer-databucket-<consumer account id>
  • Consumer S3 Target Foldermarketo-leads-source
  • ConsumerEventBusArnarn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
  • ConsumerEventRuleArnarn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule
  • ConsumerStepFunctionarn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine
  • ConsumerGlueCrawlerconsumer-glue-crawler
  • ConsumerGlueJobconsumer-glue-job
  • ConsumerGlueDatabaseconsumer-glue-database
  • ConsumerAppFlowarn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow

Producer account setup:

Create a connection to Marketo

Follow the connection setup steps outlined in here. Please make a note of the Marketo connector name.

Launch the CloudFormation stack in the producer account

Now let’s create resources in the producer account. Complete the following steps:

  1. Sign in to the producer account’s AWS CloudFormation console in the source Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-producer.
  5. Enter the following parameters and leave the rest as default:
    • AppFlowMarketoConnectorName: name of the Marketo connector, created above
    • ConsumerAccountBucket: consumer-databucket-<consumer account id>
    • ConsumerAccountBucketTargetFolder: marketo-leads-source
    • ConsumerAccountEventBusArn: arn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
    • DefaultEventBusArn: arn:aws:events:<region>:<producer account id>:event-bus/default


  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • Producer AppFlowproducer-flow
  • Producer Bucketarn:aws:s3:::producer-bucket.<region>.<producer account id>
  • Producer Flow Completion Rulearn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event
  • Producer Step Functionarn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx
  • Producer Step Function Rolearn:aws:iam::<producer account id>:role/service-role/producer-stepfunction-role
  1. After successful creation of the resources, go to the consumer account S3 bucket, consumer-databucket-<consumer account id>, and update the bucket policy as follows:
{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "AllowAppFlowDestinationActions",
            "Effect": "Allow",
            "Principal": {"Service": "appflow.amazonaws.com"},
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }, {
            "Sid": "Producer-stepfunction-role",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<producer-account-id>:role/service-role/producer-stepfunction-role"
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }
    ]
}

Validate the workflow

Let’s walk through the flow:

  1. Review the Marketo and Salesforce connection setup in the producer and consumer account respectively.

In the architecture section, we suggested scheduling the AppFlow (producer-flow) in the producer account. However, for quick testing purposes, we demonstrate how to manually run the flow on demand.

  1. Go to the AppFlow (producer-flow) in the producer account. On the Filters tab of the flow, choose Edit filters.
  2. Choose the Created At date range for which you have data.
  3. Save the range and choose Run flow.
  4. Review the producer S3 bucket.

AppFlow generates the files in the producer-flow prefix within this bucket. The files are temporarily located in the producer S3 bucket under s3://<producer-bucket>.<region>.<account-id>/producer-flow.

  1. Review the EventBridge rule and Step Functions state machine in the producer account.

The Amazon AppFlow job completion triggers an EventBridge rule (arn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event, as noted in the Outputs tab of the CloudFromation stack in the Producer Account), which triggers the Step Functions state machine (arn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx) in the producer account. The state machine copies the files to the consumer S3 bucket from the producer-flow prefix in the producer S3 bucket. Once file copy is complete, the state machine moves the files from the producer-flow prefix to the archive prefix in the producer S3 bucket. You can find the files in s3://<producer-bucket>.<region>.<account-id>/archive.

  1. Review the consumer S3 bucket.

The Step Functions state machine in the producer account copies the files to the consumer S3 bucket and sends an event to EventBridge in the consumer account. The files are located in the consumer S3 bucket under s3://consumer-databucket-<account-id>/marketo-leads-source/.

  1. Review the EventBridge rule (arn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule) in the consumer account, which should have triggered the Step Function workflow (arn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine).

The AWS Glue crawler (consumer-glue-crawler) runs to update the metadata followed by the AWS Glue job (consumer-glue-job), which curates the data by applying the Do not call filter. The curated files are placed in s3://consumer-databucket-<account-id>/marketo-leads-curated/. After data curation, the flow is started as part of the state machine.

  1. Review the Amazon AppFlow job (arn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow) run status in the consumer account.

Upon a successful run of the Amazon AppFlow job, the curated data files are moved to the s3://consumer-databucket-<account-id>/marketo-leads-processed/ folder and Salesforce is updated with the leads. Additionally, all the original source files are moved from s3://consumer-databucket-<account-id>/marketo-leads-source/ to s3://consumer-databucket-<account-id>/marketo-leads-archive/.

  1. Review the updated data in Salesforce.

You will see newly created or updated leads created by Amazon AppFlow.

Clean up

To clean up the resources created as part of this post, delete the following resources:

  1. Delete the resources in the producer account:
    • Delete the producer S3 bucket content.
    • Delete the CloudFormation stack.
  2. Delete the resources in the consumer account:
    • Delete the consumer S3 bucket content.
    • Delete the CloudFormation stack.

Summary

In this post, we showed how you can support a cross-account model to exchange data between different partners with different SaaS integrations using Amazon AppFlow. You can expand this idea to support multiple target accounts.

For more information, refer to Simplifying cross-account access with Amazon EventBridge resource policies. To learn more about Amazon AppFlow, visit Amazon AppFlow.


About the authors

Ramakant Joshi is an AWS Solutions Architect, specializing in the analytics and serverless domain. He has a background in software development and hybrid architectures, and is passionate about helping customers modernize their cloud architecture.

Debaprasun Chakraborty is an AWS Solutions Architect, specializing in the analytics domain. He has around 20 years of software development and architecture experience. He is passionate about helping customers in cloud adoption, migration and strategy.

Suraj Subramani Vineet is a Senior Cloud Architect at Amazon Web Services (AWS) Professional Services in Sydney, Australia. He specializes in designing and building scalable and cost-effective data platforms and AI/ML solutions in the cloud. Outside of work, he enjoys playing soccer on weekends.

Build a transactional data lake using Apache Iceberg, AWS Glue, and cross-account data shares using AWS Lake Formation and Amazon Athena

Post Syndicated from Vikram Sahadevan original https://aws.amazon.com/blogs/big-data/build-a-transactional-data-lake-using-apache-iceberg-aws-glue-and-cross-account-data-shares-using-aws-lake-formation-and-amazon-athena/

Building a data lake on Amazon Simple Storage Service (Amazon S3) provides numerous benefits for an organization. It allows you to access diverse data sources, build business intelligence dashboards, build AI and machine learning (ML) models to provide customized customer experiences, and accelerate the curation of new datasets for consumption by adopting a modern data architecture or data mesh architecture.

However, many use cases, like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake, require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite entire datasets as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance and compaction maintenance.

In 2022, we announced that you can enforce fine-grained access control policies using AWS Lake Formation and query data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi, and more using Amazon Athena queries. You get the flexibility to choose the table and file format best suited for your use case and get the benefit of centralized data governance to secure data access when using Athena.

In this post, we show you how to configure Lake Formation using Iceberg table formats. We also explain how to upsert and merge in an S3 data lake using an Iceberg framework and apply Lake Formation access control using Athena.

Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon S3. Iceberg also helps guarantee data correctness under concurrent write scenarios.

Solution overview

To explain this setup, we present the following architecture, which integrates Amazon S3 for the data lake (Iceberg table format), Lake Formation for access control, AWS Glue for ETL (extract, transform, and load), and Athena for querying the latest inventory data from the Iceberg tables using standard SQL.

The solution workflow consists of the following steps, including data ingestion (Steps 1–3), data governance (Step 4), and data access (Step 5):

  1. We use AWS Database Migration Service (AWS DMS) or a similar tool to connect to the data source and move incremental data (CDC) to Amazon S3 in CSV format.
  2. An AWS Glue PySpark job reads the incremental data from the S3 input bucket and performs deduplication of the records.
  3. The job then invokes Iceberg’s MERGE statements to merge the data with the target S3 bucket.
  4. We use the AWS Glue Data Catalog as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema. Lake Formation allows you to centrally manage permissions and access control for Data Catalog resources in your S3 data lake. You can use fine-grained access control in Lake Formation to restrict access to data in query results.
  5. We use Athena integrated with Lake Formation to query data from the Iceberg table using standard SQL and validate table- and column-level access on Iceberg tables.

For this solution, we assume that the raw data files are already available in Amazon S3, and focus on processing the data using AWS Glue with Iceberg table format. We use sample item data that has the following attributes:

  • op – This represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. Make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source data table.
  • category – This column represents the category of an item.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the item record was updated at the source data.

We demonstrate implementing the solution with the following steps:

  1. Create an S3 bucket for input and output data.
  2. Create input and output tables using Athena.
  3. Insert the data into the Iceberg table from Athena.
  4. Query the Iceberg table using Athena.
  5. Upload incremental (CDC) data for further processing.
  6. Run the AWS Glue job again to process the incremental files.
  7. Query the Iceberg table again using Athena.
  8. Define Lake Formation policies.

Prerequisites

For Athena queries, we need to configure an Athena workgroup with engine version 3 to support Iceberg table format.

To validate cross-account access through Lake Formation for Iceberg table, in this post we used two accounts (primary and secondary).

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output data

Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process them with AWS Glue PySpark code for the output.

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name asiceberg-blog and leave the remaining fields as default.

S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as<Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}might help you get a unique name.

  1. On the bucket details page, choose Create folder.
  2. Create two subfolders. For this post, we createiceberg-blog/raw-csv-input andiceberg-blog/iceberg-output.
  3. Upload theLOAD00000001.csvfile into the raw-csv-input folder.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena query editor and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_lf_db;

As we explain later in this post, it’s essential to record the data locations when incorporating Lake Formation access controls.

-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_lf_db.csv_input(
op string,
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
'areColumnsQuoted'='false',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'typeOfData'='file');

-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_lf_db.iceberg_table_lf (
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time timestamp)
PARTITIONED BY (category, bucket(16,product_id))
LOCATION 's3://glue-iceberg-demo/iceberg_blog/iceberg-output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
);

-- Validate the input data
SELECT * FROM iceberg_lf_db.csv_input;

SELECT * FROM iceberg_lf_db.iceberg_table_lf;

Alternatively, you can use an AWS Glue crawler to create the table definition for the input files.

Insert the data into the Iceberg table from Athena

Optionally, we can insert data into the Iceberg table through Athena using the following code:

insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (200,'Mobile','Mobile brand 1',25,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (201,'Laptop','Laptop brand 1',20,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (202,'Tablet','Kindle',30,cast('2023-01-19 09:51:41' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (203,'Speaker','Alexa',10,cast('2023-01-19 09:51:42' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (204,'Speaker','Alexa',50,cast('2023-01-19 09:51:43' as timestamp));

For this post, we load the data using an AWS Glue job. Complete the following steps to create the job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Visual with a blank canvas.
  4. Choose Create.
  5. Choose Edit script.
  6. Replace the script with the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max

from pyspark.conf import SparkConf

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()

## spark.sql.catalog.job_catalog.warehouse can be passed as an ## runtime argument with value as the S3 path
## Please make sure to pass runtime argument –
## iceberg_job_catalog_warehouse with value as the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


## Read Input Table
## glueContext.create_data_frame.from_catalog can be more 
## performant and can be replaced in place of 
## create_dynamic_frame.from_catalog.

IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_lf_db", table_name = "csv_input", transformation_ctx = "IncrementalInputDyF")
IncrementalInputDF = IncrementalInputDyF.toDF()

if not IncrementalInputDF.rdd.isEmpty():
## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation
IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)

# Add new columns to capture OP value and what is the latest timestamp
inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))

# Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output
NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

# Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
finalInputDF.createOrReplaceTempView("incremental_input_data")
finalInputDF.show()

## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_lf_db.iceberg_table_lf t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")

job.commit()
  1. On the Job details tab, specify the job name (iceberg-lf).
  2. For IAM Role, assign an AWS Identity and Access Management (IAM) role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  3. For Glue version, choose Glue 4.0 (Glue 3.0 is also supported).
  4. For Language, choose Python 3.
  5. Make sure Job bookmark has the default value of Enable.
  6. For Job parameters, add the following:
    1. Add the key--datalake-formatswith the valueiceberg.
    2. Add the key--iceberg_job_catalog_warehouse with the value as your S3 path (s3://<bucket-name>/<iceberg-warehouse-path>).
  7. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_lf_db.iceberg_table_lf limit 10;

The output of the query should match the input, with one difference: the Iceberg output table doesn’t have theopcolumn.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload an incremental file.

This file includes updated records on two items.

Run the AWS Glue job again to process incremental files

Because the AWS Glue job has bookmarks enabled, the job picks up the new incremental file and performs a MERGE operation on the Iceberg table.

To run the job again, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select the job and choose Run.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena after incremental data processing

When the incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for items 200 and 201.

The following screenshot shows the output.

Define Lake Formation policies

For data governance, we use Lake Formation. Lake Formation is a fully managed service that simplifies data lake setup, supports centralized security management, and provides transactional access on top of your data lake. Moreover, it enables data sharing across accounts and organizations. There are two ways to share data resources in Lake Formation: named resource access control (NRAC) and tag-based access control (TBAC). NRAC uses AWS Resource Access Manager (AWS RAM) to share data resources across accounts using Lake Formation V3. Those are consumed via resource links that are based on created resource shares. Lake Formation tag-based access control (LF-TBAC) is another approach to share data resources in Lake Formation, which defines permissions based on attributes. These attributes are called LF-tags.

In this example, we create databases in the primary account. Our NRAC database is shared with a data domain via AWS RAM. Access to data tables that we register in this database will be handled through NRAC.

Configure access controls in the primary account

In the primary account, complete the following steps to set up access controls using Lake Formation:

  1. On the Lake Formation console, choose Data lake locations in the navigation pane.
  2. Choose Register location.
  3. Update the Iceberg Amazon S3 location path shown in the following screenshot.

Grant access to the database to the secondary account

To grant database access to the external (secondary) account, complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Choose External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the database name.

The first grant should be at database level, and the second grant is at table level.

  1. For Database permissions, specify your permissions (for this post, we select Describe).
  2. Choose Grant.

Now you need to grant permissions at the table level.

  1. Select External accounts and enter the secondary account number.
  2. Select Named data catalog resources.
  3. Verify the table name.
  4. For Table permissions, specify the permissions you want to grant. For this post, we select Select and Describe.
  5. Choose Grant.

If you see the following error, you must revokeIAMAllowedPrincipalsfrom the data lake permissions.

To do so, select IAMAllowedPrincipals and choose Revoke.

Choose Revoke again to confirm.

After you revoke the data permissions, the permissions should appear as shown in the following screenshot.

Add AWS Glue IAM role permissions

Because the IAM principal role was revoked, the AWS Glue IAM role that was used in the AWS Glue job needs to be added exclusively to grant access as shown in the following screenshot.

You need to repeat these steps for the AWS Glue IAM role at table level.

Verify the permissions granted to the AWS Glue IAM role on the Lake Formation console.

Grant access to the Iceberg table to the external account

In the secondary account, complete the following steps to grant access to the Iceberg table to external account.

  1. On the AWS RAM console, choose Resource shares in the navigation pane.
  2. Choose the resource shares invitation sent from the primary account.
  3. Choose Accept resource share.

The resource status should now be active.

Next, you need to create a resource link for the shared Iceberg table and access through Athena.

  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Select the Iceberg table (shared from the primary account).
  3. On the Actions menu, choose Create resource link.
  4. For Resource link name, enter a name (for this post,iceberg_table_lf_demo).
  5. For Database, choose your database and verify the shared table and database are automatically populated.
  6. Choose Create.
  7. Select your table and on the Actions menu, choose View data.

You’re redirected to the Athena console, where you can query the data.

Grant column-based access in the primary account

For column-level restricted access, you need to grant access at the column level on the Iceberg table. Complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Select External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the table name.
  6. For Table permissions, choose the permissions you want to grant. For this post, we select Select.
  7. Under Data permissions, choose Column-based access.
  8. Select Include columns and choose your permission filters (for this post, Category and Quantity_available).
  9. Choose Grant.

Data with restricted columns can now be queried through the Athena console.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

This post explains how you can use the Iceberg framework with AWS Glue and Lake Formation to define cross-account access controls and query data using Athena. It provides an overview of Iceberg and its features and integration approaches, and explains how you can ingest data, grant cross-account access, and query data through a step-by-step guide.

We hope this gives you a great starting point for using Iceberg to build your data lake platform along with AWS analytics services to implement your solution.


About the Authors

Vikram Sahadevan is a Senior Resident Architect on the AWS Data Lab team. He enjoys efforts that focus around providing prescriptive architectural guidance, sharing best practices, and removing technical roadblocks with joint engineering engagements between customers and AWS technical resources that accelerate data, analytics, artificial intelligence, and machine learning initiatives.

Suvendu Kumar Patra possesses 18 years of experience in infrastructure, database design, and data engineering, and he currently holds the position of Senior Resident Architect at Amazon Web Services. He is a member of the specialized focus group, AWS Data Lab, and his primary duties entail working with executive leadership teams of strategic AWS customers to develop their roadmaps for data, analytics, and AI/ML. Suvendu collaborates closely with customers to implement data engineering, data hub, data lake, data governance, and EDW solutions, as well as enterprise data strategy and data management.

Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/exploring-new-etl-and-elt-capabilities-for-amazon-redshift-from-the-aws-glue-studio-visual-editor/

In a modern data architecture, unified analytics enable you to access the data you need, whether it’s stored in a data lake or a data warehouse. In particular, we have observed an increasing number of customers who combine and integrate their data into an Amazon Redshift data warehouse to analyze huge data at scale and run complex queries to achieve their business goals.

One of the most common use cases for data preparation on Amazon Redshift is to ingest and transform data from different data stores into an Amazon Redshift data warehouse. This is commonly achieved via AWS Glue, which is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. AWS Glue provides an extensible architecture that enables users with different data processing use cases, and works well with Amazon Redshift. At AWS re:Invent 2022, we announced support for the new Amazon Redshift integration with Apache Spark available in AWS Glue 4.0, which provides enhanced ETL (extract, transform, and load) and ELT capabilities with improved performance.

Today, we are pleased to announce a new and enhanced visual job authoring capabilities for Amazon Redshift ETL and ELT workflows on the AWS Glue Studio visual editor. The new authoring experience gives you the ability to:

  • Get started faster with Amazon Redshift by directly browsing Amazon Redshift schemas and tables from the AWS Glue Studio visual interface
  • Flexible authoring through native Amazon Redshift SQL support as a source or custom preactions and postactions
  • Simplify common data loading operations into Amazon Redshift through new support for INSERT, TRUNCATE, DROP, and MERGE commands

With these enhancements, you can use existing transforms and connectors in AWS Glue Studio to quickly create data pipelines for Amazon Redshift. No-code users can complete end-to-end tasks using only the visual interface, SQL users can reuse their existing Amazon Redshift SQL within AWS Glue, and all users can tune their logic with custom actions on the visual editor.

In this post, we explore the new streamlined user interface and dive deeper into how to use these capabilities. To demonstrate these new capabilities, we showcase the following:

  • Passing a custom SQL JOIN statement to Amazon Redshift
  • Using the results to apply an AWS Glue Studio visual transform
  • Performing an APPEND on the results to load them into a destination table

Set up resources with AWS CloudFormation

To demonstrate the AWS Glue Studio visual editor experience with Amazon Redshift, we provide an AWS CloudFormation template for you to set up baseline resources quickly. The template creates the following resources for you:

  • An Amazon VPC, subnets, route tables, an internet gateway, and NAT gateways
  • An Amazon Redshift cluster
  • An AWS Identity and Access Management (IAM) role associated with the Amazon Redshift cluster
  • An IAM role for running the AWS Glue job
  • An Amazon Simple Storage Service (Amazon S3) bucket to be used as a temporary location for Amazon Redshift ETL
  • An AWS Secrets Manager secret that stores the user name and password for the Amazon Redshift cluster

Note that at the time of writing this post, Amazon Redshift MERGE is in preview, and the cluster created is a preview cluster.

To launch the CloudFormation stack, complete the following steps:

  1. On the AWS CloudFormation console, choose Create stack and then choose With new resources (standard).
  2. For Template source, select Upload a template file, and upload the provided template.
  3. Choose Next.
  4. Enter a name for the CloudFormation stack, then choose Next.
  5. Acknowledge that this stack might create IAM resources for you, then choose Submit.
  6. After the CloudFormation stack is successfully created, follow the steps mentioned at https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-sample-db.html to load sample tickit data into the created Redshift Cluster

Exploring Amazon Redshift reads

In this section, we go over the new read functionality in the AWS Glue Studio visual editor and demonstrate how we can run a custom SQL statement via the new UI.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the Visual with a blank canvas, because we’re authoring a job from scratch, then choose Create.
  3. In the blank canvas, choose the plus sign to add an Amazon Redshift node of type Source.

When you close the node selector, and you should see an Amazon Redshift source node on the canvas along with the data source properties.

You can choose from two methods of accessing your Amazon Redshift data:

  • Direct data connection – This new method allows you to establish a connection to your Amazon Redshift sources without the need to catalog them
  • Glue Data Catalog tables – This method requires you to have already crawled or generated your Amazon Redshift tables in the AWS Glue Data Catalog

For this post, we use the Direct data connection option.

  1. For Redshift access type, select the Direct data connection.
  2. For Redshift connection, choose your AWS Glue Connection redshift-demo-blog-connection created in the CloudFormation stack.

Specifying the connection automatically configures all the network related details along with the name of the database you wish to connect to.

The UI then presents a choice on how you’d like to access the data from within your selected Amazon Redshift cluster’s database:

  • Choose a single table – This option lets you select a single schema, and a single table from your database. You can browse through all of your available schemas and tables right from the AWS Glue Studio visual editor itself, which makes choosing your source table much easier.
  • Enter a custom query If you’re looking to perform your ETL on a subset of data from your Amazon Redshift tables, you can author an Amazon Redshift query from the AWS Glue Studio UI. This query will be passed to the connected Amazon Redshift cluster, and the returned query result will be available in downstream transformations on AWS Glue Studio.

For the purposes of this post, we write our own custom query that joins data from the preloaded event table and venue table.

  1. Select Enter a custom query and enter the following query into the query editor:
select venue.venueid from event, venue where event.venueid = venue.venueid and event.starttime between '2008-01-01 14:00:00' and '2008-01-01 15:00:00' and venue.venueseats = 0

The intent of this query is to gather the venueid of locations that have had an event between 2008-01-01 14:00:00 and 2008-01-01 15:00:00 and have had venueseats = 0. If we run a similar query from the Amazon Redshift Query Editor, we can see that there are actually five such venues within that time frame. We wish to merge this data back into Amazon Redshift without including these rows.

  1. Choose Infer schema, which allows the AWS Glue Studio visual editor to understand the schema from the returned columns from your query.

You can see the schema on the Output schema tab.

  1. Under Performance and security, for S3 staging directory, choose the S3 temporary directory location created by the CloudFormation stack ( RedshiftS3TempPath ).
  2. For IAM role, choose the IAM role specified by RedshiftIamRoleARN in the CloudFormation stack.

Now we’re going to add a transform to drop duplicate rows from our join result. This will ensure that the MERGE operation in the following steps won’t have conflicting keys when performing the operation.

  1. Choose the Drop Duplicates node to view the node properties.
  2. On the Transform tab, for Drop duplicates, select Match specific keys.
  3. For Keys to match rows, choose venueid.

In this section, we defined the steps to read the output of a custom JOIN query. We then dropped the duplicate records from the returned value. In the next section, we explore the write path on the same job.

Exploring Amazon Redshift writes

Now we go over the enhancements for writing to Amazon Redshift as a destination. This section goes over all the simplified options for writing to Amazon Redshift, but highlights the new Amazon Redshift MERGE capabilities for the purposes of this post.

The MERGE operator offers great flexibility for conditionally merging rows from a source into a destination table. MERGE is powerful because it simplifies operations that traditionally were only achievable by using multiple insert, update, or delete statements separately. Within AWS Glue Studio, particularly with the custom MERGE option, you can define a more complex matching condition to handle finding the records to update.

  1. From the canvas page of the job used in the previous section, select Amazon Redshift to add an Amazon Redshift node of type Target.

When you close the selector, you should see your Amazon Redshift target node added on the Amazon Glue Studio canvas, along with possible options.

  1. For Redshift access type, select Direct data connection.

Similar to the Amazon Redshift source node, the Direct data connection method allows you to write directly to your Amazon Redshift tables without needing to have them cataloged within the AWS Glue Data Catalog.

  1. For Redshift connection, choose your AWS Glue connection redshift-demo-blog-connection created in the CloudFormation stack.
  2. For Schema, choose public.
  3. For Table, choose the venue table as the destination Amazon Redshift table where we will store the merged data.
  4. Choose MERGE data into target table.

This selection provides the user with two options:

  • Choose keys and simple actions – This is a user-friendly version of the MERGE operation. You simply specify the matching keys, and choose what happens to the rows that match the key (update them or delete them) or don’t have any matches (insert them).
  • Enter custom MERGE statement – This option provides the most flexibility. You can enter your own custom logic for MERGE.

For this post, we use the simple actions method for performing a MERGE operation.

  1. For Handling of data and target table, select MERGE data into target table, and then select Choose keys and simple actions.
  2. For Matching Keys, select venueid .

This field will become our MERGE condition for checking keys

  1. For When matched, select the Delete record in the table
  2. For When not matched, select Insert source data as a new row into the table

With these selections, we’ve configured the AWS Glue job to run a MERGE statement on Amazon Redshift while inserting our data. Moreover, for performing this MERGE operation, we use the as the key (you can select multiple keys). If there is a key match with the destination table’s record, we delete that record. Otherwise, we insert the record into the destination table.

  1. Navigate to the Job details tab.
  2. For Name, enter a name for the job.
  3. For the IAM Role drop down, select the RedshiftIamRole role that was created via the CloudFormation template.
  4. Choose Save.

  5. Choose Run and wait for the job to finish.

You can track its progress on the Runs tab.

  1. After the run reaches a successful state, navigate back to the Amazon Redshift Query Editor.
  2. Run the same query again to discover that those rows have been deleted in accordance to our MERGE specifications.

In this section, we configured an Amazon Redshift target node to write a MERGE statement to conditionally update records in our destination Amazon Redshift table. We then saved and ran the AWS Glue job, and saw the effect of the MERGE statement on our destination Amazon Redshift table.

Other available write options

In addition to MERGE, the AWS Glue Studio visual editor’s Amazon Redshift destination node also supports a number of other common operations:

  • APPEND – Appending to your target table performs an insert into the selected table without updating any of the existing records (if there are duplicates, both records will be retained). In cases where you want to update existing rows in addition to adding new rows (often referred to an UPSERT operation), you can select the Also update existing records in target table option. Note that both APPEND only and UPSERT (APPEND with UPDATE) are a simpler subset of the MERGE functionality discussed earlier.
  • TRUNCATE – The TRUNCATE option clears all the data in the existing table but retains all the existing table schema, followed by an APPEND of all new data to the empty table. This option is often used when the full dataset needs to be refreshed and downstream services or tools depend on the table schema being consistent. For example, every night an Amazon Redshift table needs to be fully updated with the latest customer information that will be consumed by an Amazon QuickSight dashboard. In this case, the ETL developer would choose TRUNCATE to ensure the data is fully refreshed but the table schema is guaranteed not to change.
  • DROP – This option is used when the full dataset needs to be refreshed and the downstream services or tools that depend on the schema or systems can handle possible schema changes without breaking.

How write operations are being handled on the backend

The Amazon Redshift connector supports two parameters called preactions and postactions. These parameters allow you to run SQL statements that will be passed on to the Amazon Redshift data warehouse before and after the actual write operation is carried out by Spark.

On the Script tab on the AWS Glue Studio page, we can see what SQL statements are being run.

Use a custom implementation for writing data into Amazon Redshift

In the event that the provided presets require more customization, or your use case requires more advanced implementations for writing to Amazon Redshift, AWS Glue Studio also allows you to freely select which preactions and postactions can be run when writing to Amazon Redshift.

To show an example, we create an Amazon Redshift datashare as a preaction, then perform the cleaning up of the same datashare as a postaction via AWS Glue Studio.

NOTE: This section is not executed as part of the above blog and is provided as an example.

  1. Choose the Amazon Redshift data target node.
  2. On the Data target properties tab, expand the Custom Redshift parameters section.
  3. For the parameters, add the following:
    1. Parameter: preactions  with Value BEGIN; CREATE DATASHARE ds1; END
    2. Parameter: postactions with Value BEGIN; DROP DATASHARE ds1; END

As you can see, we can specify multiple Amazon Redshift statements as a part of both the preactions and postactions parameters. Remember that these statements will override any existing preactions or postactions with your specified actions (as you can see in the following generated code).

Cleanup

To avoid additional costs, make sure to delete any unnecessary resources and files:

  • Empty and delete the contents from the S3 temporary bucket
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. Make sure to empty the S3 bucket before you delete the bucket.

Conclusion

In this post, we went over the new AWS Glue Studio visual options for performing reads and writes from Amazon Redshift. We also saw the simplicity with which you can browse your Amazon Redshift tables right from the AWS Glue Studio visual editor UI, and how to run your own custom SQL statements against your Amazon Redshift sources. We then explored how to perform simple ETL loading tasks against Amazon Redshift with just a few clicks, and showcased the new Amazon Redshift MERGE statement.

To dive deeper into the new Amazon Redshift integrations for the AWS Glue Studio visual editor, check out Connecting to Redshift in AWS Glue Studio.


About the Authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR

Post Syndicated from Vinay Kumar Khambhampati original https://aws.amazon.com/blogs/big-data/accelerate-hiveql-with-oozie-to-spark-sql-migration-on-amazon-emr/

Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.

Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.

In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.

Solution overview

In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.

We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.

Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.

We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.

In the following sections, we walk through these components and how to use these automations in detail.

Generate Spark SQL metadata

Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.

Oozie workflow XML parser

We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.

The process consists of two steps:

  1. The Python parser script takes input of the existing Oozie Hive job and its configuration files.
  2. The script generates a metadata JSON file for each step.

The following diagram outlines these steps and shows sample output.

Prerequisites

You need the following prerequisites:

  • Python 3.8
  • Python packages:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1

Setup

Complete the following steps:

  1. Install Python 3.8.
  2. Create a virtual environment:
python3 -m venv /path/to/new/virtual/environment
  1. Activate the newly created virtual environment:
source /path/to/new/virtual/environment/bin/activate
  1. Git clone the project:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql
  1. Install dependent packages:
cd oozie-job-parser-extract-hive-sql
pip install -r requirements.txt

Sample command

We can use the following sample command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name job.coordinator.properties

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.system.libpath': 'true', 'oozie.coord.application.path': '${nameNode}/user/${user.name}/apps/${appName}', 'oozie_app_path': '${oozie.coord.application.path}', 'start': '${{startDate}}', 'end': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'owner': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'active': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}
('./sample_jobs/development/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/development/sample_oozie_job_name/step2/step2.json', 'w')

Limitations

This method has the following limitations:

  • The Python script parses only HiveQL actions from the Oozie workflow.xml.
  • The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.

Run Spark SQL on Amazon EMR

After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.

Architecture overview

The following diagram illustrates the solution architecture.

Prerequisites

You need the following prerequisites:

  • Version:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1

Setup

Complete the following steps:

  1. Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
  2. Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
    • Use the following AWS CLI command to create the metadata table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-metadata is successfully created.

The metadata table has the following attributes.

Attributes Type Comments
id String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base path
sql_info list List of SQLs in ETL pipeline
. sql_path SQL file name
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the following AWS CLI command to create the log table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-pipelinelog is successfully created.

The log table has the following attributes.

Attributes Type Comments
job_run_id String partition_key
id String sort_key (UUID)
end_time String End time
error_description String Error in case of failure
expire Number Time to Live
sql_seq Number SQL sequence number
start_time String Start time
Status String Status of job
step_id String Job ID SQL belongs

The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.

  1. Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Change unique-code-bucket-name to a valid bucket name
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1
  1. Enable secure transfer on the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Condition": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Change unique-code-bucket-name to a valid bucket name

  1. Clone the project to your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git
  1. Create a ZIP file and upload it to the code bucket created earlier:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip
  1. Upload the ETL driver code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Upload sample job SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Add a sample step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
{
  "name": "step1.q",
  "step_name": "step1",
  "sql_info": [
    {
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
    },
    {
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
    }
  ],
  "id": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"
  }
}

  1. In the Athena query editor, create the database base:
create database base;
  1. Copy the sample data files from the repo to Amazon S3:
    1. Copy us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Copy states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
    1. Run the ./sample_data/ddl/states_current.q file by modifying the S3 path to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q file by modifying the S3 path to the bucket you created.

The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.

  1. Launch an EMR cluster.
    1. Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.

  1. Add the following steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Type=CUSTOM_JAR,Name="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Name="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Change unique-code-bucket-name to a valid bucket name

The following table summarizes the parameters for the spark step.

Step type Spark Application
Name Any Name
Deploy mode Client
Spark-submit options --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Application location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Action on failure Continue

The following table summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Client/Cluster.
name <jobname>#<stepname> Unique name for the Spark job. This can be used to identify the job on the Spark History UI.
py-files <s3 path for code>/code.zip S3 path for the code.
<s3 path for code>/etl_driver.py S3 path for the driver module. This is the entry point for the solution.
step_id <jobname>#<stepname> Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB.
job_run_id <random UUID> Unique ID to identify the log entries in DynamoDB.
log_table_name <DynamoDB Log table name> DynamoDB table for storing the job run details.
code_bucket <s3 bucket> S3 path for the SQL files that are uploaded in the job setup.
metadata_table <DynamoDB Metadata table name> DynamoDB table for storing the job metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any additional or dynamic parameters expected by the SQL files.

Validation

After completion of EMR step you should have data on S3 bucket for the table base.states_daily. We can validate the data by querying the table base.states_daily in Athena.

Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.

Solution highlights

This solution has the following benefits:

  • It avoids boilerplate code for any new pipeline and offers less maintenance of code
  • Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
  • Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
  • DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
  • For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
  • It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command

Limitations

This method has the following limitations:

  • The solution only supports SQL queries on Spark
  • Each SQL should be a Spark action like insert, create, drop, and so on

In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.

Clean up

Delete all the resources created as part of this solution to avoid ongoing charges for the resources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1
  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Stop the EMR cluster if it wasn’t a transient cluster:
aws emr terminate-clusters --cluster-ids <<cluster id created above>> 

Conclusion

In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.

For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.


About the authors

Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.

Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.

Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.

Reference guide to build inventory management and forecasting solutions on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-build-inventory-management-and-forecasting-solutions-on-aws/

Inventory management is a critical function for any business that deals with physical products. The primary challenge businesses face with inventory management is balancing the cost of holding inventory with the need to ensure that products are available when customers demand them.

The consequences of poor inventory management can be severe. Overstocking can lead to increased holding costs and waste, while understocking can result in lost sales, reduced customer satisfaction, and damage to the business’s reputation. Inefficient inventory management can also tie up valuable resources, including capital and warehouse space, and can impact profitability.

Forecasting is another critical component of effective inventory management. Accurately predicting demand for products allows businesses to optimize inventory levels, minimize stockouts, and reduce holding costs. However, forecasting can be a complex process, and inaccurate predictions can lead to missed opportunities and lost revenue.

To address these challenges, businesses need an inventory management and forecasting solution that can provide real-time insights into inventory levels, demand trends, and customer behavior. Such a solution should use the latest technologies, including Internet of Things (IoT) sensors, cloud computing, and machine learning (ML), to provide accurate, timely, and actionable data. By implementing such a solution, businesses can improve their inventory management processes, reduce holding costs, increase revenue, and enhance customer satisfaction.

In this post, we discuss how to streamline inventory management forecasting systems with AWS managed analytics, AI/ML, and database services.

Solution overview

In today’s highly competitive business landscape, it’s essential for retailers to optimize their inventory management processes to maximize profitability and improve customer satisfaction. With the proliferation of IoT devices and the abundance of data generated by them, it has become possible to collect real-time data on inventory levels, customer behavior, and other key metrics.

To take advantage of this data and build an effective inventory management and forecasting solution, retailers can use a range of AWS services. By collecting data from store sensors using AWS IoT Core, ingesting it using AWS Lambda to Amazon Aurora Serverless, and transforming it using AWS Glue from a database to an Amazon Simple Storage Service (Amazon S3) data lake, retailers can gain deep insights into their inventory and customer behavior.

With Amazon Athena, retailers can analyze this data to identify trends, patterns, and anomalies, and use Amazon ElastiCache for customer-facing applications with reduced latency. Additionally, by building a point of sales application on Amazon QuickSight, retailers can embed customer 360 views into the application to provide personalized shopping experiences and drive customer loyalty.

Finally, we can use Amazon SageMaker to build forecasting models that can predict inventory demand and optimize stock levels.

With these AWS services, retailers can build an end-to-end inventory management and forecasting solution that provides real-time insights into inventory levels and customer behavior, enabling them to make informed decisions that drive business growth and customer satisfaction.

The following diagram illustrates a sample architecture.

With the appropriate AWS services, your inventory management and forecasting system can have optimized collection, storage, processing, and analysis of data from multiple sources. The solution includes the following components.

Data ingestion and storage

Retail businesses have event-driven data that requires action from downstream processes. It’s critical for an inventory management application to handle the data ingestion and storage for changing demands.

The data ingestion process is typically triggered by an event such as an order being placed, kicking off the inventory management workflow, which requires actions from backend services. Developers are responsible for the operational overhead of trying to maintain the data ingestion load from an event driven-application.

The volume and velocity of data can change in the retail industry each day. Events like Black Friday or a new campaign can create volatile demand in what is required to process and store the inventory data. Serverless services designed to scale to businesses’ needs help reduce the architectural and operational challenges that are driven from high-demand retail applications.

Understanding the scaling challenges that occur when inventory demand spikes, we can deploy Lambda, a serverless, event-driven compute service, to trigger the data ingestion process. As inventory events occur like purchases or returns, Lambda automatically scales compute resources to meet the volume of incoming data.

After Lambda responds to the inventory action request, the updated data is stored in Aurora Serverless. Aurora Serverless is a serverless relational database that is designed to scale to the application’s needs. When peak loads hit during events like Black Friday, Aurora Serverless deploys only the database capacity necessary to meet the workload.

Inventory management applications have ever-changing demands. Deploying serverless services to handle the ingestion and storage of data will not only optimize cost but also reduce the operational overhead for developers, freeing up bandwidth for other critical business needs.

Data performance

Customer-facing applications require low latency to maintain positive user experiences with microsecond response times. ElastiCache, a fully managed, in-memory database, delivers high-performance data retrieval to users.

In-memory caching provided by ElastiCache is used to improve latency and throughput for read-heavy applications that online retailers experience. By storing critical pieces of data in-memory like commonly accessed product information, the application performance improves. Product information is an ideal candidate for a cached store due to data staying relatively the same.

Functionality is often added to retail applications to retrieve trending products. Trending products can be cycled through the cache dependent on customer access patterns. ElastiCache manages the real-time application data caching, allowing your customers to experience microsecond response times while supporting high-throughput handling of hundreds of millions of operations per second.

Data transformation

Data transformation is essential in inventory management and forecasting solutions for both data analysis around sales and inventory, as well as ML for forecasting. This is because raw data from various sources can contain inconsistencies, errors, and missing values that may distort the analysis and forecast results.

In the inventory management and forecasting solution, AWS Glue is recommended for data transformation. The tool addresses issues such as cleaning, restructuring, and consolidating data into a standard format that can be easily analyzed. As a result of the transformation, businesses can obtain a more precise understanding of inventory, sales trends, and customer behavior, influencing data-driven decisions to optimize inventory management and sales strategies. Furthermore, high-quality data is crucial for ML algorithms to make accurate forecasts.

By transforming data, organizations can enhance the accuracy and dependability of their forecasting models, ultimately leading to improved inventory management and cost savings.

Data analysis

Data analysis has become increasingly important for businesses because it allows leaders to make informed operational decisions. However, analyzing large volumes of data can be a time-consuming and resource-intensive task. This is where Athena come in. With Athena, businesses can easily query historical sales and inventory data stored in S3 data lakes and combine it with real-time transactional data from Aurora Serverless databases.

The federated capabilities of Athena allow businesses to generate insights by combining datasets without the need to build ETL (extract, transform, and load) pipelines, saving time and resources. This enables businesses to quickly gain a comprehensive understanding of their inventory and sales trends, which can be used to optimize inventory management and forecasting, ultimately improving operations and increasing profitability.

With Athena’s ease of use and powerful capabilities, businesses can quickly analyze their data and gain valuable insights, driving growth and success without the need for complex ETL pipelines.

Forecasting

Inventory forecasting is an important aspect of inventory management for businesses that deal with physical products. Accurately predicting demand for products can help optimize inventory levels, reduce costs, and improve customer satisfaction. ML can help simplify and improve inventory forecasting by making more accurate predictions based on historical data.

SageMaker is a powerful ML platform that you can use to build, train, and deploy ML models for a wide range of applications, including inventory forecasting. In this solution, we use SageMaker to build and train an ML model for inventory forecasting, covering the basic concepts of ML, the data preparation process, model training and evaluation, and deploying the model for use in a production environment.

The solution also introduces the concept of hierarchical forecasting, which involves generating coherent forecasts that maintain the relationships within the hierarchy or reconciling incoherent forecasts. The workshop provides a step-by-step process for using the training capabilities of SageMaker to carry out hierarchical forecasting using synthetic retail data and the scikit-hts package. The FBProphet model was used along with bottom-up and top-down hierarchical aggregation and disaggregation methods. We used Amazon SageMaker Experiments to train multiple models, and the best model was picked out of the four trained models.

Although the approach was demonstrated on a synthetic retail dataset, you can use the provided code with any time series dataset that exhibits a similar hierarchical structure.

Security and authentication

The solution takes advantage of the scalability, reliability, and security of AWS services to provide a comprehensive inventory management and forecasting solution that can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. By incorporating user authentication with Amazon Cognito and Amazon API Gateway, the solution ensures that the system is secure and accessible only by authorized users.

Next steps

The next step to build an inventory management and forecasting solution on AWS would be to go through the Inventory Management workshop. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end inventory management solution. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that make up an inventory management system.

Conclusion

In conclusion, building an inventory management and forecasting solution on AWS can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. With AWS services like IoT Core, Lambda, Aurora Serverless, AWS Glue, Athena, ElastiCache, QuickSight, SageMaker, and Amazon Cognito, businesses can use scalable, reliable, and secure technologies to collect, store, process, and analyze data from various sources.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, an inventory management and forecasting solution on AWS can provide businesses with the insights and tools they need to make data-driven decisions and stay competitive in a constantly evolving retail landscape.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on databases and enterprise applications, helping customers architect highly available and scalable solutions.

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Vetri Natarajan is a Specialist Solutions Architect for Amazon QuickSight. Vetri has 15 years of experience implementing enterprise business intelligence (BI) solutions and greenfield data products. Vetri specializes in integration of BI solutions with business applications and enable data-driven decisions.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS platform and specializes in Data Analytics domain.

How Morningstar used tag-based access controls in AWS Lake Formation to manage permissions for an Amazon Redshift data warehouse

Post Syndicated from Don Drake original https://aws.amazon.com/blogs/big-data/how-morningstar-used-tag-based-access-controls-in-aws-lake-formation-to-manage-permissions-for-an-amazon-redshift-data-warehouse/

This post was co-written by Ashish Prabhu, Stephen Johnston, and Colin Ingarfield at Morningstar and Don Drake, at AWS.

With “Empowering Investor Success” as the core motto, Morningstar aims at providing our investors and advisors with the tools and information they need to make informed investment decisions.

In this post, Morningstar’s Data Lake Team Leads discuss how they utilized tag-based access control in their data lake with AWS Lake Formation and enabled similar controls in Amazon Redshift.

The business challenge

At Morningstar, we built a data lake solution that allows our consumers to easily ingest data, make it accessible via the AWS Glue Data Catalog, and grant access to consumers to query the data via Amazon Athena. In this solution, we were required to ensure that the consumers could only query the data to which they had explicit access. To enforce our access permissions, we chose Lake Formation tag-based access control (TBAC). TBAC helps us categorize the data into a simple, broad level or a complex, more granular level using tags and then grant consumers access to those tags based on what group of data they need. Tag-based entitlements allow us to have a flexible and manageable entitlements system that solves our complex entitlements scenarios.

However, our consumers pushed us for better query performance and enhanced analytical capabilities. We realized we needed a data warehouse to cater to all of these consumer requirements, so we evaluated Amazon Redshift. Amazon Redshift provides us with features that we could use to work with our consumers and enable their analytical requirements:

  • Better performance for consumers’ analytical requirements
  • Ability to tune query performance with user-specified sort keys and distribution keys
  • Ability to have different representations of the same data via views and materialized views
  • Consistent query performance regardless of concurrency

Many new Amazon Redshift features helped solve and scale our analytical query requirements, specifically Amazon Redshift Serverless and Amazon Redshift data sharing.

Because our Lake Formation-enforced data lake is a central data repository for all our data, it makes sense for us to flow the data permissions from the data lake into Amazon Redshift. We utilize AWS Identity and Access Management (IAM) authentication and want to centralize the governance of permissions based on IAM roles and groups. For each AWS Glue database and table, we have a corresponding Amazon Redshift schema and table. Our goal was to ensure customers who have access to AWS Glue tables via Lake Formation also have access to the corresponding tables in Amazon Redshift.

However, we faced a problem with user-based entitlements as we moved to Amazon Redshift.

The entitlements problem

Even though we added Amazon Redshift as part of our overall solution, the entitlement requirements and challenges that came with it remained the same for our users consuming via Lake Formation. At the same time, we had to find a way to implement entitlements in our Amazon Redshift data warehouse with the same set of tags that we had already defined in Lake Formation. Amazon Redshift supports resource-based entitlements but doesn’t support tag-based entitlements. The challenge we had to overcome was how to map our existing tag-based entitlements in Lake Formation to the resource-based entitlements in Amazon Redshift.

The data in the AWS Glue Data Catalog needed to be also loaded in the Amazon Redshift data warehouse native tables. This was necessary so that the users get a familiar list of schema and tables that they are accustomed to seeing in the Data Catalog when accessing via Athena. This way, our existing data lake consumers could easily transition to Amazon Redshift.

The following diagram illustrates the structure of the AWS Glue Data Catalog mapped 1:1 with the structure of our Amazon Redshift data warehouse.

Shows mapping of Glue databases and tables to Redshift schemas and tables.

We wanted to utilize the ontology of tags in Lake Formation to also be used on the datasets in Amazon Redshift so that consumers could be granted access to the same datasets in both places. This enabled us to have a single entitlement policy source API that would grant appropriate access to both our Amazon Redshift tables as well as the corresponding Lake Formation tables based on the Lake Formation tag-based policies.

Entitlement Policy Source is used by Lake Formation and Redshift

To solve this problem, we needed to build our own solution to convert the tag-based policies in Lake Formation into grants and revokes in the resource-based entitlements in Amazon Redshift.

Solution overview

To solve this mismatch, we wanted to synchronize our Lake Formation tag ontology and classifications to the Amazon Redshift permission model. To do this, we map Lake Formation tags and grants to Amazon Redshift grants with the following steps:

  1. Map all the resources (databases, schemas, tables, and more) in Lake Formation that are tagged to their equivalent Amazon Redshift tables.
  2. Translate each policy in Lake Formation on a tag expression to a set of Amazon Redshift table grants and revokes.

The net result is that when there is a tag or policy change in Lake Formation, a corresponding set of grants or revokes are made to the equivalent Amazon Redshift tables to keep our entitlements in sync.

Map all tagged resources in Lake Formation to Amazon Redshift equivalents

The tag-based access control of Lake Formation allowed us to apply multiple tags on a single resource (database and table) in the AWS Glue Data Catalog. If visualized in a mapping form, the resource tagging can be displayed as how multiple tags on a single table would be flattened into individual entitlements on Amazon Redshift tables.

Mapping of tags in Lake Formation to Redshift tables

Translate tags to Amazon Redshift grants and revokes

To enable the migration of the tag-based policy enforced in Lake Formation, the permissions can be converted into simple grants and revokes that can be done on a per-group level.

There are two fundamental parts to a tag policy: the principal_id and the tag expression (for example, “Acess Level” = “Public”). Assuming that we have an Amazon Redshift database group for each principal_id, then the resources that represent the tag expression can be permissioned accordingly. We plan on migrating from database groups to database roles in a future implementation.

mapping of tags to Redshift user group

The solution implementation

The implementation of this solution led us to develop two components:

  • The mapper service
  • The Amazon Redshift data configuration

The mapper service can be thought of as a translation service. As the name suggests, it has the core business logic to map the tag and policy information into resource-based grants and revokes in Amazon Redshift. It needs to mimic the behavior of Lake Formation when handling the tag policy translation.

To do this translation, the mapper needs to understand and store the metadata at two levels:

  • Understanding what resource in Amazon Redshift is to be tagged with what value
  • Tracking the grants and revokes already performed so they can be updated with changes in the policy

To do this, we created a config schema in our Amazon Redshift cluster, which currently stores all the configurations.

As part of our implementation, we store the mapped (translated) information in Amazon Redshift. This allows us to incrementally update table grants as Lake Formation tags or policies changed. The following diagram illustrates this schema.

schema of configuration stored in Redshift

Business impact and value

The solution we put together has created key business impacts and values out of the current implementation and allows us greater flexibility in the future.

It allows us to get the data to our users faster with the tag policies applied in Lake Formation and translated directly to permissions in Amazon Redshift with immediate effect. It also allows us to have consistency in permissions applied in both Lake Formation and Amazon Redshift, based on the effective permissions derived from tag policies. And all this happens via a single source that grants and revokes permissions across the board, instead of managing them separately.

If we translate this into the business impact and business value that we generate, the solution improves the time to market of our data, but at the same time provides consistent entitlements across the business-driven categories that we define as tags.

The solution also opens up solutions to add more impact as our product scales both horizontally and vertically. There are potential solutions we could implement in terms of automation, users self-servicing their permissions, auditing, dashboards, and more. As our business scales, we expect to take advantage of these capabilities.

Conclusion

In this post, we shared how Morningstar utilized tag-based access control in our data lake with Lake Formation and enabled similar controls in Amazon Redshift. We developed two components that handle mapping of the tag-based access controls to Amazon Redshift permissions. This solution has improved the time to market for our data and provides consistent entitlements across different business-driven categories.

If you have any questions or comments, please leave them in the comments section.


About the Authors

Ashish Prabhu is a Senior Manager of Software Engineering in Morningstar, Inc. He focuses on the solutioning and delivering the different aspects of Data Lake and Data Warehouse for Morningstar’s Enterprise Data and Platform Team. In his spare time he enjoys playing basketball, painting and spending time with his family.

Stephen Johnston is a Distinguished Software Architect at Morningstar, Inc. His focus is on data lake and data warehousing technologies for Morningstar’s Enterprise Data Platform team.

Colin Ingarfield is a Lead Software Engineer at Morningstar, Inc. Based in Austin, Colin focuses on access control and data entitlements on Morningstar’s growing Data Lake platform.

Don Drake is a Senior Analytics Specialist Solutions Architect at AWS. Based in Chicago, Don helps Financial Services customers migrate workloads to AWS.

Implement column-level encryption to protect sensitive data in Amazon Redshift with AWS Glue and AWS Lambda user-defined functions

Post Syndicated from Aaron Chong original https://aws.amazon.com/blogs/big-data/implement-column-level-encryption-to-protect-sensitive-data-in-amazon-redshift-with-aws-glue-and-aws-lambda-user-defined-functions/

Amazon Redshift is a massively parallel processing (MPP), fully managed petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using existing business intelligence tools.

When businesses are modernizing their data warehousing solutions to Amazon Redshift, implementing additional data protection mechanisms for sensitive data, such as personally identifiable information (PII) or protected health information (PHI), is a common requirement, especially for those in highly regulated industries with strict data security and privacy mandates. Amazon Redshift provides role-based access control, row-level security, column-level security, and dynamic data masking, along with other database security features to enable organizations to enforce fine-grained data security.

Security-sensitive applications often require column-level (or field-level) encryption to enforce fine-grained protection of sensitive data on top of the default server-side encryption (namely data encryption at rest). In other words, sensitive data should be always encrypted on disk and remain encrypted in memory, until users with proper permissions request to decrypt the data. Column-level encryption provides an additional layer of security to protect your sensitive data throughout system processing so that only certain users or applications can access it. This encryption ensures that only authorized principals that need the data, and have the required credentials to decrypt it, are able to do so.

In this post, we demonstrate how you can implement your own column-level encryption mechanism in Amazon Redshift using AWS Glue to encrypt sensitive data before loading data into Amazon Redshift, and using AWS Lambda as a user-defined function (UDF) in Amazon Redshift to decrypt the data using standard SQL statements. Lambda UDFs can be written in any of the programming languages supported by Lambda, such as Java, Go, PowerShell, Node.js, C#, Python, Ruby, or a custom runtime. You can use Lambda UDFs in any SQL statement such as SELECT, UPDATE, INSERT, or DELETE, and in any clause of the SQL statements where scalar functions are allowed.

Solution overview

The following diagram describes the solution architecture.

Architecture Diagram

To illustrate how to set up this architecture, we walk you through the following steps:

  1. We upload a sample data file containing synthetic PII data to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. A sample 256-bit data encryption key is generated and securely stored using AWS Secrets Manager.
  3. An AWS Glue job reads the data file from the S3 bucket, retrieves the data encryption key from Secrets Manager, performs data encryption for the PII columns, and loads the processed dataset into an Amazon Redshift table.
  4. We create a Lambda function to reference the same data encryption key from Secrets Manager, and implement data decryption logic for the received payload data.
  5. The Lambda function is registered as a Lambda UDF with a proper AWS Identity and Access Management (IAM) role that the Amazon Redshift cluster is authorized to assume.
  6. We can validate the data decryption functionality by issuing sample queries using Amazon Redshift Query Editor v2.0. You may optionally choose to test it with your own SQL client or business intelligence tools.

Prerequisites

To deploy the solution, make sure to complete the following prerequisites:

  • Have an AWS account. For this post, you configure the required AWS resources using AWS CloudFormation in the us-east-2 Region.
  • Have an IAM user with permissions to manage AWS resources including Amazon S3, AWS Glue, Amazon Redshift, Secrets Manager, Lambda, and AWS Cloud9.

Deploy the solution using AWS CloudFormation

Provision the required AWS resources using a CloudFormation template by completing the following steps:

  1. Sign in to your AWS account.
  2. Choose Launch Stack:
    Launch Button
  3. Navigate to an AWS Region (for example, us-east-2).
  4. For Stack name, enter a name for the stack or leave as default (aws-blog-redshift-column-level-encryption).
  5. For RedshiftMasterUsername, enter a user name for the admin user account of the Amazon Redshift cluster or leave as default (master).
  6. For RedshiftMasterUserPassword, enter a strong password for the admin user account of the Amazon Redshift cluster.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.
    Create CloudFormation stack

The CloudFormation stack creation process takes around 5–10 minutes to complete.

  1. When the stack creation is complete, on the stack Outputs tab, record the values of the following:
    1. AWSCloud9IDE
    2. AmazonS3BucketForDataUpload
    3. IAMRoleForRedshiftLambdaUDF
    4. LambdaFunctionName

CloudFormation stack output

Upload the sample data file to Amazon S3

To test the column-level encryption capability, you can download the sample synthetic data generated by Mockaroo. The sample dataset contains synthetic PII and sensitive fields such as phone number, email address, and credit card number. In this post, we demonstrate how to encrypt the credit card number field, but you can apply the same method to other PII fields according to your own requirements.

Sample synthetic data

An AWS Cloud9 instance is provisioned for you during the CloudFormation stack setup. You may access the instance from the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.

CloudFormation stack output for AWSCloud9IDE

On the AWS Cloud9 terminal, copy the sample dataset to your S3 bucket by running the following command:

S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2274/pii-sample-dataset.csv s3://$S3_BUCKET/

Upload sample dataset to S3

Generate a secret and secure it using Secrets Manager

We generate a 256-bit secret to be used as the data encryption key. Complete the following steps:

  1. Create a new file in the AWS Cloud9 environment.
    Create new file in Cloud9
  2. Enter the following code snippet. We use the cryptography package to create a secret, and use the AWS SDK for Python (Boto3) to securely store the secret value with Secrets Manager:
    from cryptography.fernet import Fernet
    import boto3
    import base64
    
    key = Fernet.generate_key()
    client = boto3.client('secretsmanager')
    
    response = client.create_secret(
        Name='data-encryption-key',
        SecretBinary=base64.urlsafe_b64decode(key)
    )
    
    print(response['ARN'])

  3. Save the file with the file name generate_secret.py (or any desired name ending with .py).
    Save file in Cloud9
  4. Install the required packages by running the following pip install command in the terminal:
    pip install --user boto3
    pip install --user cryptography

  5. Run the Python script via the following command to generate the secret:
    python generate_secret.py

    Run Python script

Create a target table in Amazon Redshift

A single-node Amazon Redshift cluster is provisioned for you during the CloudFormation stack setup. To create the target table for storing the dataset with encrypted PII columns, complete the following steps:

  1. On the Amazon Redshift console, navigate to the list of provisioned clusters, and choose your cluster.
    Amazon Redshift console
  2. To connect to the cluster, on the Query data drop-down menu, choose Query in query editor v2.
    Connect with Query Editor v2
  3. If this is the first time you’re using the Amazon Redshift Query Editor V2, accept the default setting by choosing Configure account.
    Configure account
  4. To connect to the cluster, choose the cluster name.
    Connect to Amazon Redshift cluster
  5. For Database, enter demodb.
  6. For User name, enter master.
  7. For Password, enter your password.

You may need to change the user name and password according to your CloudFormation settings.

  1. Choose Create connection.
    Create Amazon Redshift connection
  2. In the query editor, run the following DDL command to create a table named pii_table:
    CREATE TABLE pii_table(
      id BIGINT,
      full_name VARCHAR(50),
      gender VARCHAR(10),
      job_title VARCHAR(50),
      spoken_language VARCHAR(50),
      contact_phone_number VARCHAR(20),
      email_address VARCHAR(50),
      registered_credit_card VARCHAR(50)
    );

We recommend using the smallest possible column size as a best practice, and you may need to modify these table definitions per your specific use case. Creating columns much larger than necessary will have an impact on the size of data tables and affect query performance.

Create Amazon Redshift table

Create the source and destination Data Catalog tables in AWS Glue

The CloudFormation stack provisioned two AWS Glue data crawlers: one for the Amazon S3 data source and one for the Amazon Redshift data source. To run the crawlers, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
    AWS Glue Crawlers
  2. Select the crawler named glue-s3-crawler, then choose Run crawler to trigger the crawler job.
    Run Amazon S3 crawler job
  3. Select the crawler named glue-redshift-crawler, then choose Run crawler.
    Run Amazon Redshift crawler job

When the crawlers are complete, navigate to the Tables page to verify your results. You should see two tables registered under the demodb database.

AWS Glue database tables

Author an AWS Glue ETL job to perform data encryption

An AWS Glue job is provisioned for you as part of the CloudFormation stack setup, but the extract, transform, and load (ETL) script has not been created. We create and upload the ETL script to the /glue-script folder under the provisioned S3 bucket in order to run the AWS Glue job.

  1. Return to your AWS Cloud9 environment either via the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.
    CloudFormation stack output for AWSCloud9IDE

We use the Miscreant package for implementing a deterministic encryption using the AES-SIV encryption algorithm, which means that for any given plain text value, the generated encrypted value will be always the same. The benefit of using this encryption approach is to allow for point lookups, equality joins, grouping, and indexing on encrypted columns. However, you should also be aware of the potential security implication when applying deterministic encryption to low-cardinality data, such as gender, boolean values, and status flags.

  1. Create a new file in the AWS Cloud9 environment and enter the following code snippet:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrameCollection
    from awsglue.dynamicframe import DynamicFrame
    
    import boto3
    import base64
    from miscreant.aes.siv import SIV
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import StringType
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "SecretName", "InputTable"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # retrieve the data encryption key from Secrets Manager
    secret_name = args["SecretName"]
    
    sm_client = boto3.client('secretsmanager')
    get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
    data_encryption_key = get_secret_value_response['SecretBinary']
    siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic
    
    # define the data encryption function
    def pii_encrypt(value):
        if value is None:
            value = ""
        ciphertext = siv.seal(value.encode())
        return base64.b64encode(ciphertext).decode('utf-8')
    
    # register the data encryption function as Spark SQL UDF   
    udf_pii_encrypt = udf(lambda z: pii_encrypt(z), StringType())
    
    # define the Glue Custom Transform function
    def Encrypt_PII (glueContext, dfc) -> DynamicFrameCollection:
        newdf = dfc.select(list(dfc.keys())[0]).toDF()
        
        # PII fields to be encrypted
        pii_col_list = ["registered_credit_card"]
    
        for pii_col_name in pii_col_list:
            newdf = newdf.withColumn(pii_col_name, udf_pii_encrypt(col(pii_col_name)))
    
        encrypteddyc = DynamicFrame.fromDF(newdf, glueContext, "encrypted_data")
        return (DynamicFrameCollection({"CustomTransform0": encrypteddyc}, glueContext))
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="demodb",
        table_name=args["InputTable"],
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[
            ("id", "long", "id", "long"),
            ("full_name", "string", "full_name", "string"),
            ("gender", "string", "gender", "string"),
            ("job_title", "string", "job_title", "string"),
            ("spoken_language", "string", "spoken_language", "string"),
            ("contact_phone_number", "string", "contact_phone_number", "string"),
            ("email_address", "string", "email_address", "string"),
            ("registered_credit_card", "long", "registered_credit_card", "string"),
        ],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Custom Transform
    Customtransform_node = Encrypt_PII(glueContext, DynamicFrameCollection({"ApplyMapping_node2": ApplyMapping_node2}, glueContext))
    
    # Script generated for node Redshift Cluster
    RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
        frame=Customtransform_node,
        database="demodb",
        table_name="demodb_public_pii_table",
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="RedshiftCluster_node3",
    )
    
    job.commit()

  2. Save the script with the file name pii-data-encryption.py.
    Save file in Cloud9
  3. Copy the script to the desired S3 bucket location by running the following command:
    S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
    aws s3 cp pii-data-encryption.py s3://$S3_BUCKET/glue-script/pii-data-encryption.py

    Upload AWS Glue script to S3

  4. To verify the script is uploaded successfully, navigate to the Jobs page on the AWS Glue console.You should be able to find a job named pii-data-encryption-job.
    AWS Glue console
  5. Choose Run to trigger the AWS Glue job.It will first read the source data from the S3 bucket registered in the AWS Glue Data Catalog, then apply column mappings to transform data into the expected data types, followed by performing PII fields encryption, and finally loading the encrypted data into the target Redshift table. The whole process should be completed within 5 minutes for this sample dataset.AWS Glue job scriptYou can switch to the Runs tab to monitor the job status.
    Monitor AWS Glue job

Configure a Lambda function to perform data decryption

A Lambda function with the data decryption logic is deployed for you during the CloudFormation stack setup. You can find the function on the Lambda console.

AWS Lambda console

The following is the Python code used in the Lambda function:

import boto3
import os
import json
import base64
import logging
from miscreant.aes.siv import SIV

logger = logging.getLogger()
logger.setLevel(logging.INFO)

secret_name = os.environ['DATA_ENCRYPT_KEY']

sm_client = boto3.client('secretsmanager')
get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
data_encryption_key = get_secret_value_response['SecretBinary']

siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic

# define lambda function logic
def lambda_handler(event, context):
    ret = dict()
    res = []
    for argument in event['arguments']:
        encrypted_value = argument[0]
        try:
            de_val = siv.open(base64.b64decode(encrypted_value)) # perform decryption
        except:
            de_val = encrypted_value
            logger.warning('Decryption for value failed: ' + str(encrypted_value)) 
        res.append(json.dumps(de_val.decode('utf-8')))

    ret['success'] = True
    ret['results'] = res

    return json.dumps(ret) # return decrypted results

If you want to deploy the Lambda function on your own, make sure to include the Miscreant package in your deployment package.

Register a Lambda UDF in Amazon Redshift

You can create Lambda UDFs that use custom functions defined in Lambda as part of your SQL queries. Lambda UDFs are managed in Lambda, and you can control the access privileges to invoke these UDFs in Amazon Redshift.

  1. Navigate back to the Amazon Redshift Query Editor V2 to register the Lambda UDF.
  2. Use the CREATE EXTERNAL FUNCTION command and provide an IAM role that the Amazon Redshift cluster is authorized to assume and make calls to Lambda:
    CREATE OR REPLACE EXTERNAL FUNCTION pii_decrypt (value varchar(max))
    RETURNS varchar STABLE
    LAMBDA '<--Replace-with-your-lambda-function-name-->'
    IAM_ROLE '<--Replace-with-your-redshift-lambda-iam-role-arn-->';

You can find the Lambda name and Amazon Redshift IAM role on the CloudFormation stack Outputs tab:

  • LambdaFunctionName
  • IAMRoleForRedshiftLambdaUDF

CloudFormation stack output
Create External Function in Amazon Redshift

Validate the column-level encryption functionality in Amazon Redshift

By default, permission to run new Lambda UDFs is granted to PUBLIC. To restrict usage of the newly created UDF, revoke the permission from PUBLIC and then grant the privilege to specific users or groups. To learn more about Lambda UDF security and privileges, see Managing Lambda UDF security and privileges.

You must be a superuser or have the sys:secadmin role to run the following SQL statements:

GRANT SELECT ON "demodb"."public"."pii_table" TO PUBLIC;
CREATE USER regular_user WITH PASSWORD '1234Test!';
CREATE USER privileged_user WITH PASSWORD '1234Test!';
REVOKE EXECUTE ON FUNCTION pii_decrypt(varchar) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pii_decrypt(varchar) TO privileged_user;

First, we run a SELECT statement to verify that our highly sensitive data field, in this case the registered_credit_card column, is now encrypted in the Amazon Redshift table:

SELECT * FROM "demodb"."public"."pii_table";

Select statement

For regular database users who have not been granted the permission to use the Lambda UDF, they will see a permission denied error when they try to use the pii_decrypt() function:

SET SESSION AUTHORIZATION regular_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

Permission denied

For privileged database users who have been granted the permission to use the Lambda UDF for decrypting the data, they can issue a SQL statement using the pii_decrypt() function:

SET SESSION AUTHORIZATION privileged_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

The original registered_credit_card values can be successfully retrieved, as shown in the decrypted_credit_card column.

Decrypted results

Cleaning up

To avoid incurring future charges, make sure to clean up all the AWS resources that you created as part of this post.

You can delete the CloudFormation stack on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI). The default stack name is aws-blog-redshift-column-level-encryption.

Conclusion

In this post, we demonstrated how to implement a custom column-level encryption solution for Amazon Redshift, which provides an additional layer of protection for sensitive data stored on the cloud data warehouse. The CloudFormation template gives you an easy way to set up the data pipeline, which you can further customize for your specific business scenarios. You can also modify the AWS Glue ETL code to encrypt multiple data fields at the same time, and to use different data encryption keys for different columns for enhanced data security. With this solution, you can limit the occasions where human actors can access sensitive data stored in plain text on the data warehouse.

You can learn more about this solution and the source code by visiting the GitHub repository. To learn more about how to use Amazon Redshift UDFs to solve different business problems, refer to Example uses of user-defined functions (UDFs) and Amazon Redshift UDFs.


About the Author

Aaron ChongAaron Chong is an Enterprise Solutions Architect at Amazon Web Services Hong Kong. He specializes in the data analytics domain, and works with a wide range of customers to build big data analytics platforms, modernize data engineering practices, and advocate AI/ML democratization.

Implement slowly changing dimensions in a data lake using AWS Glue and Delta

Post Syndicated from Nith Govindasivan original https://aws.amazon.com/blogs/big-data/implement-slowly-changing-dimensions-in-a-data-lake-using-aws-glue-and-delta/

In a data warehouse, a dimension is a structure that categorizes facts and measures in order to enable users to answer business questions. To illustrate an example, in a typical sales domain, customer, time or product are dimensions and sales transactions is a fact. Attributes within the dimension can change over time—a customer can change their address, an employee can move from a contractor position to a full-time position, or a product can have multiple revisions to it. A slowly changing dimension (SCD) is a data warehousing concept that contains relatively static data that can change slowly over a period of time. There are three major types of SCDs maintained in data warehousing: Type 1 (no history), Type 2 (full history), and Type 3 (limited history). Change data capture (CDC) is a characteristic of a database that provides an ability to identify the data that changed between two database loads, so that an action can be performed on the changed data.

As organizations across the globe are modernizing their data platforms with data lakes on Amazon Simple Storage Service (Amazon S3), handling SCDs in data lakes can be challenging. It becomes even more challenging when source systems don’t provide a mechanism to identify the changed data for processing within the data lake and makes the data processing highly complex if the data source happens to be semi-structured instead of a database. The key objective while handling Type 2 SCDs is to define the start and end dates to the dataset accurately to track the changes within the data lake, because this provides the point-in-time reporting capability for the consuming applications.

In this post, we focus on demonstrating how to identify the changed data for a semi-structured source (JSON) and capture the full historical data changes (SCD Type 2) and store them in an S3 data lake, using AWS Glue and open data lake format Delta.io. This implementation supports the following use cases:

  • Track Type 2 SCDs with start and end dates to identify the current and full historical records and a flag to identify the deleted records in the data lake (logical deletes)
  • Use consumption tools such as Amazon Athena to query historical records seamlessly

Solution overview

This post demonstrates the solution with an end-to-end use case using a sample employee dataset. The dataset represents employee details such as ID, name, address, phone number, contractor or not, and more. To demonstrate the SCD implementation, consider the following assumptions:

  • The data engineering team receives daily files that are full snapshots of records and don’t contain any mechanism to identify source record changes
  • The team is tasked with implementing SCD Type 2 functionality for identifying new, updated, and deleted records from the source, and to preserve the historical changes in the data lake
  • Because the source systems don’t provide the CDC capability, a mechanism needs to be developed to identify the new, updated, and deleted records and persist them in the data lake layer

The architecture is implemented as follows:

  • Source systems ingest files in the S3 landing bucket (this step is mimicked by generating the sample records using the provided AWS Lambda function into the landing bucket)
  • An AWS Glue job (Delta job) picks the source data file and processes the changed data from the previous file load (new inserts, updates to the existing records, and deleted records from the source) into the S3 data lake (processed layer bucket)
  • The architecture uses the open data lake format (Delta), and builds the S3 data lake as a Delta Lake, which is mutable, because the new changes can be updated, new inserts can be appended, and source deletions can be identified accurately and marked with a delete_flag value
  • An AWS Glue crawler catalogs the data, which can be queried by Athena

The following diagram illustrates our architecture.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments. This template creates the following resources:

  • Two S3 buckets: a landing bucket for storing sample employee data and a processed layer bucket for the mutable data lake (Delta Lake)
  • A Lambda function to generate sample records
  • An AWS Glue extract, transform, and load (ETL) job to process the source data from the landing bucket to the processed bucket

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack:

  1. Enter a stack name.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake resources – The S3 buckets scd-blog-landing-xxxx and scd-blog-processed-xxxx (referred to as scd-blog-landing and scd-blog-processed in the subsequent sections in this post)
  • Sample records generator Lambda functionSampleDataGenaratorLambda-<CloudFormation Stack Name> (referred to as SampleDataGeneratorLambda)
  • AWS Glue Data Catalog databasedeltalake_xxxxxx (referred to as deltalake)
  • AWS Glue Delta job<CloudFormation-Stack-Name>-src-to-processed (referred to as src-to-processed)

Note that deploying the CloudFormation stack in your account incurs AWS usage charges.

Test SCD Type 2 implementation

With the infrastructure in place, you’re ready to test out the overall solution design and query historical records from the employee dataset. This post is designed to be implemented for a real customer use case, where you get full snapshot data on a daily basis. We test the following aspects of SCD implementation:

  • Run an AWS Glue job for the initial load
  • Simulate a scenario where there are no changes to the source
  • Simulate insert, update, and delete scenarios by adding new records, and modifying and deleting existing records
  • Simulate a scenario where the deleted record comes back as a new insert

Generate a sample employee dataset

To test the solution, and before you can start your initial data ingestion, the data source needs to be identified. To simplify that step, a Lambda function has been deployed in the CloudFormation stack you just deployed.

Open the function and configure a test event, with the default hello-world template event JSON as seen in the following screenshot. Provide an event name without any changes to the template and save the test event.

Choose Test to invoke a test event, which invokes the Lambda function to generate the sample records.

When the Lambda function completes its invocation, you will be able to see the following sample employee dataset in the landing bucket.

Run the AWS Glue job

Confirm if you see the employee dataset in the path s3://scd-blog-landing/dataset/employee/. You can download the dataset and open it in a code editor such as VS Code. The following is an example of the dataset:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737\nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717\nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas Springs\nDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011\nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson Tunnel\nMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748\nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert Views\nWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813\nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537\nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189\nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229\nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997\nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050\nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks Estates\nEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500\nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004\nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah Creek\nWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659\nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930\nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732\nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049\nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297\nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes Plain\nHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Download the dataset and keep it ready, because you will modify the dataset for future use cases to simulate the inserts, updates, and deletes. The sample dataset generated for you will be entirely different than what you see in the preceding example.

To run the job, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job src-to-processed.
  3. On the Runs tab, choose Run.

When the AWS Glue job is run for the first time, the job reads the employee dataset from the landing bucket path and ingests the data to the processed bucket as a Delta table.

When the job is complete, you can create a crawler to see the initial data load. The following screenshot shows the database available on the Databases page.

  1. Choose Crawlers in the navigation pane.
  2. Choose Create crawler.

  1. Name your crawler delta-lake-crawler, then choose Next.

  1. Select Not yet for data already mapped to AWS Glue tables.
  2. Choose Add a data source.

  1. On the Data source drop-down menu, choose Delta Lake.
  2. Enter the path to the Delta table.
  3. Select Create Native tables.
  4. Choose Add a Delta Lake data source.

  1. Choose Next.

  1. Choose the role that was created by the CloudFormation template, then choose Next.

  1. Choose the database that was created by the CloudFormation template, then choose Next.

  1. Choose Create crawler.

  1. Select your crawler and choose Run.

Query the data

After the crawler is complete, you can see the table it created.

To query the data, complete the following steps:

  1. Choose the employee table and on the Actions menu, choose View data.

You’re redirected to the Athena console. If you don’t have the latest Athena engine, create a new Athena workgroup with the latest Athena engine.

  1. Under Administration in the navigation pane, choose Workgroups.

  1. Choose Create workgroup.

  1. Provide a name for the workgroup, such as DeltaWorkgroup.
  2. Select Athena SQL as the engine, and choose Athena engine version 3 for Query engine version.

  1. Choose Create workgroup.

  1. After you create the workgroup, select the workgroup (DeltaWorkgroup) on the drop-down menu in the Athena query editor.

  1. Run the following query on the employee table:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation outputs before running the above query.

You can observe that the employee table has 25 records. The following screenshot shows the total employee records with some sample records.

The Delta table is stored with an emp_key, which is unique to each and every change and is used to track the changes. The emp_key is created for every insert, update, and delete, and can be used to find all the changes pertaining to a single emp_id.

The emp_key is created using the SHA256 hashing algorithm, as shown in the following code:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"),
            col("phone_number"), col("isContractor")), 256))

Perform inserts, updates, and deletes

Before making changes to the dataset, let’s run the same job one more time. Assuming that the current load from the source is the same as the initial load with no changes, the AWS Glue job shouldn’t make any changes to the dataset. After the job is complete, run the previous Select query in the Athena query editor and confirm that there are still 25 active records with the following values:

  • All 25 records with the column isCurrent=true
  • All 25 records with the column end_date=Null
  • All 25 records with the column delete_flag=false

After you confirm the previous job run with these values, let’s modify our initial dataset with the following changes:

  1. Change the isContractor flag to false (change it to true if your dataset already shows false) for emp_id=12.
  2. Delete the entire row where emp_id=8 (make sure to save the record in a text editor, because we use this record in another use case).
  3. Copy the row for emp_id=25 and insert a new row. Change the emp_id to be 26, and make sure to change the values for other columns as well.

After we make these changes, the employee source dataset looks like the following code (for readability, we have only included the changed records as described in the preceding three steps):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce Meadows\nLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes Plain\nHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}
  1. Now, upload the changed fake_emp_data.json file to the same source prefix.

  1. After you upload the changed employee dataset to Amazon S3, navigate to the AWS Glue console and run the job.
  2. When the job is complete, run the following query in the Athena query editor and confirm that there are 27 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run another query in the Athena query editor and confirm that there are 4 records returned with the following values:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=12:

  • One emp_id=12 record with the following values (for the record that was ingested as part of the initial load):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • A second emp_id=12 record with the following values (for the record that was ingested as part of the change to the source):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (or empty string)

The record for emp_id=8 that was deleted in the source as part of this run will still exist but with the following changes to the values:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

The new employee record will be inserted with the following values:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (or empty string)
  • delete_flag=false

Note that the emp_key values in your actual table may be different than what is provided here as an example.

  1. For the deletes, we check for the emp_id from the base table along with the new source file and inner join the emp_key.
  2. If the condition evaluates to true, we then check if the employee base table emp_key equals the new updates emp_key, and get the current, undeleted record (isCurrent=true and delete_flag=false).
  3. We merge the delete changes from the new file with the base table for all the matching delete condition rows and update the following:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

See the following code:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true"

base_tbl.alias("employee")\
        .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond)\
        .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false",
                                                        "end_date": current_date(),
                                                        "delete_flag": "true"}).execute()
  1. For both the updates and the inserts, we check for the condition if the base table employee.emp_id is equal to the new changes.emp_id and the employee.emp_key is equal to new changes.emp_key, while only retrieving the current records.
  2. If this condition evaluates to true, we then get the current record (isCurrent=true and delete_flag=false).
  3. We merge the changes by updating the following:
    1. If the second condition evaluates to true:
      1. isCurrent=false
      2. end_date=current_date
    2. Or we insert the entire row as follows if the second condition evaluates to false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (or empty string)
      10. isCurrent=true
      11. delete_flag=false

See the following code:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false"

base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond)\
    .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false",
                                                            "end_date": current_date()
                                                            }) \
    .whenNotMatchedInsert(
    values={
        "isCurrent": "true",
        "emp_id": "employeeUpdates.emp_id",
        "first_name": "employeeUpdates.first_name",
        "last_name": "employeeUpdates.last_name",
        "Address": "employeeUpdates.Address",
        "phone_number": "employeeUpdates.phone_number",
        "isContractor": "employeeUpdates.isContractor",
        "emp_key": "employeeUpdates.emp_key",
        "start_date": current_date(),
        "delete_flag":  "employeeUpdates.delete_flag",
        "end_date": "null"
    })\
    .execute()

As a last step, let’s bring back the deleted record from the previous change to the source dataset and see how it is reinserted into the employee table in the data lake and observe how the complete history is maintained.

Let’s modify our changed dataset from the previous step and make the following changes.

  1. Add the deleted emp_id=8 back to the dataset.

After making these changes, my employee source dataset looks like the following code (for readability, we have only included the added record as described in the preceding step):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott Valley\nGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Upload the changed employee dataset file to the same source prefix.
  2. After you upload the changed fake_emp_data.json dataset to Amazon S3, navigate to the AWS Glue console and run the job again.
  3. When the job is complete, run the following query in the Athena query editor and confirm that there are 28 records in total with the following values:
SELECT * FROM "deltalake_2438fbd0"."employee";

Note: Update the correct database name from the CloudFormation output before running the above query.

  1. Run the following query and confirm there are 5 records:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Note: Update the correct database name from the CloudFormation output before running the above query.

You will see two records for emp_id=8:

  • One emp_id=8 record with the following values (the old record that was deleted):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Another emp_id=8 record with the following values (the new record that was inserted in the last run):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (or empty string)

The emp_key values in your actual table may be different than what is provided here as an example. Also note that because this is a same deleted record that was reinserted in the subsequent load without any changes, there will be no change to the emp_key.

End-user sample queries

The following are some sample end-user queries to demonstrate how the employee change data history can be traversed for reporting:

  • Query 1 – Retrieve a list of all the employees who left the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return two employee records who left the organization.

  • Query 2 – Retrieve a list of new employees who joined the organization in the current month (for example, March 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Note: Update the correct database name from the CloudFormation output before running the above query.

The preceding query would return 23 active employee records who joined the organization.

  • Query 3 – Find the history of any given employee in the organization (in this case employee 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Note: Update the correct database name from the CloudFormation output before running the above query.

In the preceding query, we can observe that employee 18 had two changes to their employee records before they left the organization.

Note that the data results provided in this example are different than what you will see in your specific records based on the sample data generated by the Lambda function.

Clean up

When you have finished experimenting with this solution, clean up your resources, to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to identify the changed data for a semi-structured data source and preserve the historical changes (SCD Type 2) on an S3 Delta Lake, when source systems are unable to provide the change data capture capability, with AWS Glue. You can further extend this solution to enable downstream applications to build additional customizations from CDC data captured in the data lake.

Additionally, you can extend this solution as part of an orchestration using AWS Step Functions or other commonly used orchestrators your organization is familiar with. You can also extend this solution by adding partitions where appropriate. You can also maintain the delta table by compacting the small files.


About the authors

Nith Govindasivan, is a Data Lake Architect with AWS Professional Services, where he helps onboarding customers on their modern data architecture journey through implementing Big Data & Analytics solutions. Outside of work, Nith is an avid Cricket fan, watching almost any cricket during his spare time and enjoys long drives, and traveling internationally.

Vijay Velpula is a Data Architect with AWS Professional Services. He helps customers implement Big Data and Analytics Solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

AWS Glue crawlers support cross-account crawling to support data mesh architecture

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/aws-glue-crawlers-support-cross-account-crawling-to-support-data-mesh-architecture/

Data lakes have come a long way, and there’s been tremendous innovation in this space. Today’s modern data lakes are cloud native, work with multiple data types, and make this data easily available to diverse stakeholders across the business. As time has gone by, data lakes have grown significantly and have evolved to data meshes as a way to scale. Thoughtworks defines a data mesh as “a shift in a modern distributed architecture that applies platform thinking to create self-serve data infrastructure, treating data as the product.”

Data mesh advocates for decentralized ownership and delivery of enterprise data management systems that benefit several personas. Data producers can use the data mesh platform to create datasets and share them across business teams to ensure data availability, reliability, and interoperability across functions and data subject areas. Data consumers now have better data sharing with data mesh and federation across business units without compromising data security. The data governance team can support distributed data, where all data is accessible to those with the proper authority to access it. With data mesh, data doesn’t have to be consolidated into a single data lake or account and can remain within different databases and data lakes. An essential capability needed in such a data lake architecture is the ability to continuously understand changes in the data lakes in various other domains and make those available to data consumers. Without such a capability, manual work is needed to understand producers’ updates and make them available to consumers and governance.

AWS customers use a modern data architecture to facilitate governance and data sharing across logical or physical governance boundaries to create data domains aligned to lines of business. Each line of business creates and manages their dataset on Amazon Simple Storage Service (Amazon S3) and uses AWS Glue crawlers to discover new datasets and register them to the AWS Glue Data Catalog, add new tables and partitions, and detect schema changes. These datasets are shared with data consumers that access the data using services like Amazon Athena, Amazon Redshift, Amazon EMR, and more.

In the post Introducing AWS Glue crawlers using AWS Lake Formation permission management, we introduced a new set of capabilities in AWS Glue crawlers and AWS Lake Formation that simplifies crawler setup and supports centralized permissions for in-account and cross-account crawling of S3 data lakes. In this post, we demonstrate the same capability for a data mesh architecture in which we establish a central governance layer to catalog the data owned by the data producer and share it with the data consumer for ease of discovery. The AWS Glue crawler cross-account capability allows you to crawl data sources in different producer accounts while still having those changes cataloged in a centralized governance account. Customers prefer the central governance experience over writing bucket policies separately in each bucket owning the account of a data mesh producer. To build a data mesh architecture, now you can author permissions in a single Lake Formation governance to manage access to data locations and crawlers spanning multiple accounts in the data mesh.

According to the Allstate Corporation:

“By leveraging the power of AWS Lake Formation in our modern data architecture, we will be able to further unlock the potential of our data and empower our analytics community to drive innovation and build data-driven applications. The granular data access and collaboration provided by this architecture will enable us to build a truly unified data and analytics experience, bringing us one step closer to realizing our vision of becoming a fully data-driven enterprise.”

– Prashant Mehrotra, Director – Machine Learning and R&D, Allstate

In this post, we walk through the creation of a simplified data mesh architecture that shows how to use an AWS Glue crawler with Lake Formation to automate bringing changes from data producer domains to data consumers while maintaining centralized governance.

Solution overview

In a data mesh architecture, you have several producer accounts that own S3 buckets, several consumer accounts who wants to access shared datasets, and a central governance account to manage data shares between producers and consumers. This central governance account doesn’t own any S3 bucket or actual tables.

The following figure shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account. The data mesh producer account hosts the encrypted S3 bucket, which is shared with the central governance account. The central governance account registers the S3 bucket with Lake Formation using an AWS Identity and Access Management (IAM) role, which has permissions to the S3 bucket and AWS Key Management Service (AWS KMS). The central account creates the database for storing the dataset schema and shares it with the producer account. The producer account, as the S3 bucket owner, runs a crawler to crawl the buckets registered with the central account using Lake Formation permissions and populates the database. Now the shared database with new datasets are available to share with consumers in the data mesh. The central governance account can now share the database with a consumer admin, who can delegate access to other personas (such as data analysts) in the consumer account for data access.

shows a simplified data mesh architecture with a single producer account, a centralized governance account, and a single consumer account

In the following sections, we provide AWS CloudFormation templates to set up the resources in each account. Then we provide the steps to configure the crawler, manage permissions and sharing, and validate the solution by running queries with Athena.

Prerequisites

Complete the following steps in each account (producer, central governance, and consumer) to update the Data Catalog settings to use Lake Formation permissions to control catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as admin.
  2. If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
    add yourself as the data lake administrator.
  3. In the navigation pane, under Data catalog, choose Settings.
  4. Uncheck Use only IAM access control for new databases.
  5. Uncheck Use only IAM access control for new tables in new databases.
  6. Keep Version 3 as the current cross-account version.
  7. Choose Save.

Set up resources in the central governance account

The CloudFormation template for the central account creates a CentralDataMeshOwner user assigned as Lake Formation admin. The CentralDataMeshOwner user in the central governance account performs the necessary steps to share the central catalogs with the producer and consumer accounts. The CentralDataMeshOwner user also sets up a custom Lake Formation service role to register the S3 data lake location. Complete the following steps:

  1. Log in to the central governance account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For DataMeshOwnerUserName, keep the default (CentralDataMeshOwner).
  4. For ProducerAWSAccount, enter the producer account ID.
  5. Create the stack.
  6. After the stack launches, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the value of RegisterLocationServiceRole.
  8. Choose the LFUsersPassword value to navigate to the AWS Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user CentralDataMeshOwner.

Set up resources in the producer account

The CloudFormation template for the producer account creates the following resources:

  • IAM user LOBProducerSteward
  • S3 bucket retail-datalake-<producer account id >-<producer region>
  • KMS key used for bucket encryption
  • Required S3 bucket policies to provide access to the central governance account
  • AWS Glue crawler and crawler IAM role with necessary permissions

Complete the following steps:

  1. Log in to the producer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. For CentralAccountID, enter the central account ID.
  4. For CentralAccountLFServiceRole, enter the value of RegisterLocationServiceRole from CloudFormation noted earlier.
  5. Create the stack.
  6. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  7. Note down the AWSGlueServiceRole value.
  8. Choose the ProducerStewardUserCredentials value to navigate to the Secrets Manager console.
  9. In the Secret value section, choose Retrieve secret value.
  10. Note down the secret value for the password for IAM user LOBProducerSteward.
  11. On the Amazon S3 console, check the bucket policies for retail-datalake-<producer account id >-<producer region> and make sure it is shared with the central governance account IAM role.

This is required for registering the bucket with Lake Formation in the central account so that the account can manage the data sharing.

  1. On the AWS KMS console, check that the bucket is encrypted with the customer managed key and the key is shared with the central governance account.

Set up resources in the consumer account

The CloudFormation template for the consumer account creates the following resources:

  • IAM user ConsumerAdminUser assigned to the data lake admin
  • IAM user LFBusinessAnalyst1
  • S3 bucket for Athena output
  • Athena workgroup

Complete the following steps:

  1. Log in to the consumer account console as IAM administrator.
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. Create the stack.
  4. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  5. Choose the AllConsumerUsersCredentials value to navigate to the Secrets Manager console.
  6. In the Secret value section, choose Retrieve secret value.
  7. Note down the secret value for the password for the IAM user ConsumerAdminUser.

Now that all the accounts have been set up, we set up cross-account sharing on AWS with a central governance account to manage sharing of permissions across producers and consumers.

Configure the central governance account to manage sharing with the producer account

Sign in to the central governance account as CentralDataMeshOwner using the password noted earlier through the central governance account CloudFormation stack. Then complete the following steps:

  1. On Lake Formation console, choose Data lake locations under Register and ingest in the navigation pane.
  2. For Amazon S3 path, provide the path retail-datalake-<producer account id >-<region>.
  3. For IAM role, choose the IAM role created using the CloudFormation stack.

This role has permissions for the accessing the encrypted S3 bucket and its key. Do not choose the role AWSServiceRoleForLakeFormationDataAccess.

  1. Choose Register location.
  2. In the navigation pane, choose Databases.
  3. Choose Create database.
  4. For Database name¸ enter datameshtestdatabase.
  5. Choose Create database.
  6. In the navigation pane, choose Data locations and choose Grant.
  7. Select External account and provide the producer account for AWS account ID, AWS organization ID, or IAM principal ARN.
  8. For Storage location, provide the data lake bucket path.
  9. Select Grantable, then choose Grant.
  10. Choose Data lake permissions, then choose Grant.
  11. Select External accounts and provide the producer account number.
  12. For Databases, choose datameshtestdatabase.
  13. For Database permissions and Grantable permissions, select Create table, Alter, and Describe.
  14. Choose Grant.

Configure the crawler in the producer account to populate the schema

Sign in to producer account as LOBProducerSteward with the password noted earlier through the producer account CloudFormation stack, then complete the following steps:

  1. On the AWS RAM console, accept the pending resource share from the central account.
  2. On the Lake Formation console, choose Databases under Data catalog in the navigation pane.
  3. Choose datameshtestdatabase, and on the Action menu, choose Create resource link.
  4. For Resource link name, enter datameshtestdatabaselink.
  5. Choose Create.
  6. On the AWS Glue console, choose Crawlers in the navigation pane.
  7. Choose the crawler CrossAccountCrawler-<accountid>.
  8. Choose Edit, then choose Configure security settings.
  9. Select Use Lake Formation credentials for crawling S3 data source.
  10. Select In a different account and provide the account ID of the central governance account.
  11. Choose Next.
  12. Choose datameshtestdatabaselink as the database and choose Update.
  13. In the navigation pane, choose Data locations and choose Grant.
  14. Select My account, and choose the crawler IAM role for IAM users and roles.
  15. For Storage locations, choose the bucket retail-datalake-<accountid>-<region>.
  16. For Registered account location, enter the central account ID.
  17. Choose Grant.
    Alternatively, you can also use the AWS CLI to grant data location permission on bucket registered in central account to the crawler role using below command:

    aws lakeformation grant-permissions 
    --principal DataLakePrincipalIdentifier="<Crawler Role ARN>" 
    --permissions "DATA_LOCATION_ACCESS” 
    --resource ‘{ "DataLocation": {"ResourceArn":"<S3 bucket arn>", "CatalogId": "<Central Account id>"}}'

    For using CLI, refer to Installing or updating the latest version of the AWS CLI.

  18. In the navigation pane, choose Data lake permissions.
  19. Choose the crawler IAM role for the principal account.
  20. Choose datameshtestdatabase for the database.
  21. For Database permissions, select Create, Describe, and Alter.
  22. Choose Grant.
  23. Choose the crawler IAM role for the principal account.
  24. Choose datameshtestdatabaselink for the database.
  25. For Resource link permissions, select Describe.
  26. Choose Grant.
  27. Run the crawler.

The following screenshot shows the details after a successful run.

When the crawler is complete, you can validate the table created under the database datameshtestdatabaselink.

This table is owned by the producer account and available in the central governance account under the shared database datameshtestdatabase. Now the data lake admin in the central governance account can share the database and populated table with the consumer account.

Configure the central governance account to manage sharing of read-only access with the consumer account

Sign in to the central governance account as CentralDataMeshOwner with the password noted earlier through the central governance account CloudFormation stack, then complete the following steps:

  1. Grant database permissions to the consumer account.
  2. For Principals, choose external account and provide <consumer accountID>
  3. For Databases, select datameshtestdatabase.
  4. For Database permissions, select Describe.
  5. For Grantable permissions¸ select Describe.
  6. Choose Grant.

  7. Grant table permissions to the consumer account.
  8. For Principals, choose external account and provide <consumer accountID>.
  9. For Databases, select datameshtestdatabase.
  10. For Tables, select retail_datalake_<accountID>_<region>.
  11. For Table permissions, select Select and Describe.
  12. For Grantable permissions¸ select Select and Describe.
  13. Choose Grant.

Configure the consumer account as the consumer account data lake admin

Sign to the consumer account as ConsumerAdminUser with the password noted earlier through the consumer account CloudFormation stack. (Note that in the consumer account Lake Formation configuration, both ConsumerAdminUser and LFBusinessAnalyst1 have the same password.)

  1. On the AWS RAM console, accept the resource share from the central account.
  2. On the Lake Formation console, validate that the shared database datameshtestdatabase is available and create the resource link datameshtestdatabaselink using the shared database.

The following screenshot shows the details after the resource link is created.

  1. On the Lake Formation console, choose Grant.
  2. Choose LFBusinessAnalyst1 for IAM users and roles.
  3. Choose datameshtestdatabase for the database under Named data catalog resources.
  4. Select Describe for Database permissions.
  5. On the Lake Formation console, choose Grant.
  6. Choose LFBusinessAnalyst1 for IAM users and roles.
  7. Choose datameshtestdatabaselink for the database under Named data catalog resources.
  8. Select Describe for Resource link permissions.
  9. On the Lake Formation console, choose Grant.
  10. Choose LFBusinessAnalyst1 for IAM users and roles.
  11. Choose retail_datalake_<accountid>_<region> for the table under Named data catalog resources.
  12. Select Select and Describe for Table permissions.

Run queries in the consumer account

Sign to the consumer account console as LFBusinessAnalyst1 with the password noted earlier through the consumer account CloudFormation stack, then complete the following steps:

  1. On the Athena console, and choose lfconsumer-workgroup as the Athena workgroup.
  2. Run the following query to validate access:
select * from datameshtestdatabaselink.retail_datalake_<accountid>_<region>

We have successfully registered the dataset and created a Data Catalog in the central governance account. We crawled the data lake that was registered with the central governance account using Lake Formation permissions from the producer account and populated the schema. We granted Lake Formation permission on the database and table from the central account to the consumer user and validated consumer user access to the data using Athena.

Clean up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign in to the CloudFormation console as the IAM admin used for creating the CloudFormation stack in all three accounts.
  2. Delete the stacks you created.

Conclusion

In this post, we showed how to set up cross-account crawling using a central governance account with the new AWS Glue crawler capability of Lake Formation integration. This capability allows data producers to set up crawling capabilities in their own domain so that changes are seamlessly available to data governance and data consumers. Implementing a data mesh with AWS Glue crawlers, Lake Formation, Athena, and other analytical services provide a well-understood, performant, scalable, and cost-effective solution to integrate, prepare, and serve data.

If you have questions or suggestions, submit them in the comments section.

For more resources, refer to the following:


About the authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Piyali Kamra is a seasoned enterprise architect and a hands-on technologist who believes that building large scale enterprise systems is not an exact science but more like an art, in which tools and technologies must be carefully selected based on the team’s culture , strengths , weaknesses and risks , in tandem with having a futuristic vision as to how you want to shape your product a few years down the road.

Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 2: AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-glue-studio-visual-editor-introducing-native-support-for-apache-hudi-delta-lake-and-apache-iceberg-on-aws-glue-for-apache-spark/

In the first post of this series, we described how AWS Glue for Apache Spark works with Apache Hudi, Linux Foundation Delta Lake, and Apache Iceberg datasets tables using the native support of those data lake formats. This native support simplifies reading and writing your data for these data lake frameworks so you can more easily build and maintain your data lakes in a transactionally consistent manner. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs.

These data lake frameworks help you store data more efficiently and enable applications to access your data faster. Unlike simpler data file formats such as Apache Parquet, CSV, and JSON, which can store big data, data lake frameworks organize distributed big data files into tabular structures that enable basic constructs of databases on data lakes.

Expanding on the functionality we announced at AWS re:Invent 2022, AWS Glue now natively supports Hudi, Delta Lake and Iceberg through the AWS Glue Studio visual editor. If you prefer authoring AWS Glue for Apache Spark jobs using a visual tool, you can now choose any of these three data lake frameworks as a source or target through a graphical user interface (GUI) without any custom code.

Even without prior experience using Hudi, Delta Lake or Iceberg, you can easily achieve typical use cases. In this post, we demonstrate how to ingest data stored in Hudi using the AWS Glue Studio visual editor.

Example scenario

To demonstrate the visual editor experience, this post introduces the Global Historical Climatology Network Daily (GHCN-D) dataset. The data is publicly accessible through an Amazon Simple Storage Service (Amazon S3) bucket. For more information, see the Registry of Open Data on AWS. You can also learn more in Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight.

The Amazon S3 location s3://noaa-ghcn-pds/csv/by_year/ has all the observations from 1763 to the present organized in CSV files, one file for each year. The following block shows an example of what the records look like:

ID,DATE,ELEMENT,DATA_VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AE000041196,20220102,TAVG,226,H,,S,
...
AE000041196,20221231,TMAX,243,,,S,
AE000041196,20221231,PRCP,0,D,,S,
AE000041196,20221231,TAVG,202,H,,S,

The records have fields including ID, DATE, ELEMENT, and more. Each combination of ID, DATE, and ELEMENT represents a unique record in this dataset. For example, the record with ID as AE000041196, ELEMENT as TAVG, and DATE as 20220101 is unique.

In this tutorial, we assume that the files are updated with new records every day, and want to store only the latest record per the primary key (ID and ELEMENT) to make the latest snapshot data queryable. One typical approach is to do an INSERT for all the historical data, and calculate the latest records in queries; however, this can introduce additional overhead in all the queries. When you want to analyze only the latest records, it’s better to do an UPSERT (update and insert) based on the primary key and DATE field rather than just an INSERT in order to avoid duplicates and maintain a single updated row of data.

Prerequisites

To continue this tutorial, you need to create the following AWS resources in advance:

Process a Hudi dataset on the AWS Glue Studio visual editor

Let’s author an AWS Glue job to read daily records in 2022, and write the latest snapshot into the Hudi table on your S3 bucket using UPSERT. Complete following steps:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source and Target, choose Amazon S3, then choose Create.

A new visual job configuration appears. The next step is to configure the data source to read an example dataset:

  1. Under Visual, choose Data source – S3 bucket.
  2. Under Node properties, for S3 source type, select S3 location.
  3. For S3 URL, enter s3://noaa-ghcn-pds/csv/by_year/2022.csv.

The data source is configured.

data-source

The next step is to configure the data target to ingest data in Apache Hudi on your S3 bucket:

  1. Choose Data target – S3 bucket.
  2. Under Data target properties- S3, for Format, choose Apache Hudi.
  3. For Hudi Table Name, enter ghcn.
  4. For Hudi Storage Type, choose Copy on write.
  5. For Hudi Write Operation, choose Upsert.
  6. For Hudi Record Key Fields, choose ID.
  7. For Hudi Precombine Key Field, choose DATE.
  8. For Compression Type, choose GZIP.
  9. For S3 Target location, enter s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn/. (Provide your S3 bucket name and prefix.)

To make it easy to discover the sample data, and also make it queryable from Athena, configure the job to create a table definition on the AWS Glue Data Catalog:

  1. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  2. For Database, choose hudi_native.
  3. For Table name, enter ghcn.
  4. For Partition keys – optional, choose ELEMENT.

Now your data integration job is authored in the visual editor completely. Let’s add one remaining setting about the IAM role, then run the job:

  1. Under Job details, for IAM Role, choose your IAM role.
  2. Choose Save, then choose Run.

data-target

  1. Navigate to the Runs tab to track the job progress and wait for it to complete.

job-run

Query the table with Athena

Now that the job has successfully created the Hudi table, you can query the table through different engines, including Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum, in addition to AWS Glue for Apache Spark.

To query through Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. In the query editor, enter the following SQL and choose Run:
SELECT * FROM "hudi_native"."ghcn" limit 10;

The following screenshot shows the query result.
athena-query1

Let’s dive deep into the table to understand how the data is ingested and focus on the records with ID=’AE000041196′.

  1. Run the following query to focus on the very specific example records with ID='AE000041196':
SELECT * FROM "hudi_native"."ghcn" WHERE ID='AE000041196';

The following screenshot shows the query result.
athena-query2

The original source file 2022.csv has historical records for record ID='USW00012894' from 20220101 to 20221231, however the query result shows only four records, one record per ELEMENT at the latest snapshot of the day 20221230 or 20221231. Because we used the UPSERT write option when writing data, we configured the ID field as a Hudi record key field, the DATE field as a Hudi precombine field, and the ELEMENT field as partition key field. When two records have the same key value, Hudi picks the one with the largest value for the precombine field. When the job ingested data, it compared all the values in the DATE field for each pair of ID and ELEMENT, and then picked the record with the largest value in the DATE field.

According to the preceding result, we were able to ingest the latest snapshot from all the 2022 data. Now let’s do an UPSERT of the new 2023 data to overwrite the records on the target Hudi table.

  1. Go back to AWS Glue Studio console, modify the source S3 location to s3://noaa-ghcn-pds/csv/by_year/2023.csv, then save and run the job.

upsert-data-source

  1. Run the same Athena query from the Athena console.

athena-query3
Now you see that the four records have been updated with the new records in 2023.

If you have further future records, this approach works well to upsert new records based on the Hudi record key and Hudi precombine key.

Clean up

Now to the final step, cleaning up the resources:

  1. Delete the AWS Glue database hudi_native.
  2. Delete the AWS Glue table ghcn.
  3. Delete the S3 objects under s3://<Your S3 bucket name>/<Your S3 bucket prefix>/hudi_native/ghcn2022/.

Conclusion

This post demonstrated how to process Hudi datasets using the AWS Glue Studio visual editor. The AWS Glue Studio visual editor enables you to author jobs while taking advantage of data lake formats and without needing expertise in them. If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Scott Long is a Front End Engineer on the AWS Glue team. He is responsible for implementing new features in AWS Glue Studio. In his spare time, he enjoys socializing with friends and participating in various outdoor activities.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

How Infomedia built a serverless data pipeline with change data capture using AWS Glue and Apache Hudi

Post Syndicated from Gowtham Dandu original https://aws.amazon.com/blogs/big-data/how-infomedia-built-a-serverless-data-pipeline-with-change-data-capture-using-aws-glue-and-apache-hudi/

This is a guest post co-written with Gowtham Dandu from Infomedia.

Infomedia Ltd (ASX:IFM) is a leading global provider of DaaS and SaaS solutions that empowers the data-driven automotive ecosystem. Infomedia’s solutions help OEMs, NSCs, dealerships and 3rd party partners manage the vehicle and customer lifecycle. They are used by over 250,000 industry professionals, across 50 OEM brands and in 186 countries to create a convenient customer journey, drive dealer efficiencies and grow sales.

In this post, we share how Infomedia built a serverless data pipeline with change data capture (CDC) using AWS Glue and Apache Hudi.

Infomedia was looking to build a cloud-based data platform to take advantage of highly scalable data storage with flexible and cloud-native processing tools to ingest, transform, and deliver datasets to their SaaS applications. The team wanted to set up a serverless architecture with scale-out capabilities that would allow them to optimize time, cost, and performance of the data pipelines and eliminate most of the infrastructure management.

To serve data to their end-users, the team wanted to develop an API interface to retrieve various product attributes on demand. Performance and scalability of both the data pipeline and API endpoint were key success criteria. The data pipeline needed to have sufficient performance to allow for fast turnaround in the event that data issues needed to be corrected. Finally, the API endpoint performance was important for end-user experience and customer satisfaction. When designing the data processing pipeline for the attribute API, the Infomedia team wanted to use a flexible and open-source solution for processing data workloads with minimal operational overhead.

They saw an opportunity to use AWS Glue, which offers a popular open-source big data processing framework, and Apache Spark, in a serverless environment for end-to-end pipeline development and deployment.

Solution overview

The solution involved ingesting data from various third-party sources in different formats, processing to create a semantic layer, and then exposing the processed dataset as a REST API to end-users. The API retrieves data at runtime from an Amazon Aurora PostgreSQL-Compatible Edition database for end-user consumption. To populate the database, the Infomedia team developed a data pipeline using Amazon Simple Storage Service (Amazon S3) for data storage, AWS Glue for data transformations, and Apache Hudi for CDC and record-level updates. They wanted to develop a simple incremental data processing pipeline without having to update the entire database each time the pipeline ran. The Apache Hudi framework allowed the Infomedia team to maintain a golden reference dataset and capture changes so that the downstream database could be incrementally updated in a short timeframe.

To implement this modern data processing solution, Infomedia’s team chose a layered architecture with the following steps:

  1. The raw data originates from various third-party sources and is a collection of flat files with a fixed width column structure. The raw input data is stored in Amazon S3 in JSON format (called the bronze dataset layer).
  2. The raw data is converted to an optimized Parquet format using AWS Glue. The Parquet data is stored in a separate Amazon S3 location and serves as the staging area during the CDC process (called the silver dataset layer). The Parquet format results in improved query performance and cost savings for downstream processing.
  3. AWS Glue reads the Parquet file from the staging area and updates Apache Hudi tables stored in Amazon S3 (the golden dataset layer) as part of incremental data processing. This process helps create mutable datasets on Amazon S3 to store the versioned and latest set of records.
  4. Finally, AWS Glue is used to populate Amazon Aurora PostgreSQL-Compatible Edition with the latest version of the records. This dataset is used to serve the API endpoint. The API itself is a Spring Java application deployed as a Docker container in an Amazon Elastic Container Service (Amazon ECS) AWS Fargate environment.

The following diagram illustrates this architecture.

arch diag

AWS Glue and Apache Hudi overview

AWS Glue is a serverless data integration service that makes it easy to prepare and process data at scale from a wide variety of data sources. With AWS Glue, you can ingest data from multiple data sources, extract and infer schema, populate metadata in a centralized data catalog, and prepare and transform data for analytics and machine learning. AWS Glue has a pay-as-you-go model with no upfront costs, and you only pay for resources that you consume.

Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. It allows you to comply with data privacy laws, manage CDC operations, reinstate late-arriving data, and roll back to a particular point in time. You can use AWS Glue to build a serverless Apache Spark-based data pipeline and take advantage of the AWS Glue native connector for Apache Hudi at no cost to manage CDC operations with record-level insert, updates, and deletes.

Solution benefits

Since the start of Infomedia’s journey with AWS Glue, the Infomedia team has experienced several benefits over the self-managed extract, transform, and load (ETL) tooling. With the horizontal scaling of AWS Glue, they were able to seamlessly scale the compute capacity of their data pipeline workloads by a factor of 5. This allowed them to increase both the volume of records and the number of datasets they could process for downstream consumption. They were also able to take advantage of AWS Glue built-in optimizations, such as pre-filtering using pushdown predicates, which allowed the team to save valuable engineering time tuning the performance of data processing jobs.

In addition, Apache Spark-based AWS Glue enabled developers to author jobs using concise Spark SQL and dataset APIs. This allowed for rapid upskilling of developers who are already familiar with database programming. Because developers are working with higher-level constructs across entire datasets, they spend less time solving for low-level technical implementation details.

Also, the AWS Glue platform has been cost-effective when compared against running self-managed Apache Spark infrastructure. The team did an initial analysis that showed an estimated savings of 70% over running a dedicated Spark EC2 infrastructure for their workload. Furthermore, the AWS Glue Studio job monitoring dashboard provides the Infomedia team with detailed job-level visibility that makes it easy to get a summary of the job runs and understand data processing costs.

Conclusion and next steps

Infomedia will continue to modernize their complex data pipelines using the AWS Glue platform and other AWS Analytics services. Through integration with services such as AWS Lake Formation and the AWS Glue Data Catalog, the Infomedia team plans to maintain reference primary datasets and democratize access to high-value datasets, allowing for further innovation.

If you would like to learn more, please visit AWS Glue and AWS Lake Formation to get started on your data integration journey.


About the Authors

Gowtham Dandu is an Engineering Lead at Infomedia Ltd with a passion for building efficient and effective solutions on the cloud, especially involving data, APIs, and modern SaaS applications. He specializes in building microservices and data platforms that are cost-effective and highly scalable.

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Simplify data loading into Type 2 slowly changing dimensions in Amazon Redshift

Post Syndicated from Vaidy Kalpathy original https://aws.amazon.com/blogs/big-data/simplify-data-loading-into-type-2-slowly-changing-dimensions-in-amazon-redshift/

Thousands of customers rely on Amazon Redshift to build data warehouses to accelerate time to insights with fast, simple, and secure analytics at scale and analyze data from terabytes to petabytes by running complex analytical queries. Organizations create data marts, which are subsets of the data warehouse and usually oriented for gaining analytical insights specific to a business unit or team. The star schema is a popular data model for building data marts.

In this post, we show how to simplify data loading into a Type 2 slowly changing dimension in Amazon Redshift.

Star schema and slowly changing dimension overview

A star schema is the simplest type of dimensional model, in which the center of the star can have one fact table and a number of associated dimension tables. A dimension is a structure that captures reference data along with associated hierarchies, while a fact table captures different values and metrics that can be aggregated by dimensions. Dimensions provide answers to exploratory business questions by allowing end-users to slice and dice data in a variety of ways using familiar SQL commands.

Whereas operational source systems contain only the latest version of master data, the star schema enables time travel queries to reproduce dimension attribute values on past dates when the fact transaction or event actually happened. The star schema data model allows analytical users to query historical data tying metrics to corresponding dimensional attribute values over time. Time travel is possible because dimension tables contain the exact version of the associated attributes at different time ranges. Relative to the metrics data that keeps changing on a daily or even hourly basis, the dimension attributes change less frequently. Therefore, dimensions in a star schema that keeps track of changes over time are referred to as slowly changing dimensions (SCDs).

Data loading is one of the key aspects of maintaining a data warehouse. In a star schema data model, the central fact table is dependent on the surrounding dimension tables. This is captured in the form of primary key-foreign key relationships, where the dimension table primary keys are referred by foreign keys in the fact table. In the case of Amazon Redshift, uniqueness, primary key, and foreign key constraints are not enforced. However, declaring them will help the optimizer arrive at optimal query plans, provided that the data loading processes enforce their integrity. As part of data loading, the dimension tables, including SCD tables, get loaded first, followed by the fact tables.

SCD population challenge

Populating an SCD dimension table involves merging data from multiple source tables, which are usually normalized. SCD tables contain a pair of date columns (effective and expiry dates) that represent the record’s validity date range. Changes are inserted as new active records effective from the date of data loading, while simultaneously expiring the current active record on a previous day. During each data load, incoming change records are matched against existing active records, comparing each attribute value to determine whether existing records have changed or were deleted or are new records coming in.

In this post, we demonstrate how to simplify data loading into a dimension table with the following methods:

  • Using Amazon Simple Storage Service (Amazon S3) to host the initial and incremental data files from source system tables
  • Accessing S3 objects using Amazon Redshift Spectrum to carry out data processing to load native tables within Amazon Redshift
  • Creating views with window functions to replicate the source system version of each table within Amazon Redshift
  • Joining source table views to project attributes matching with dimension table schema
  • Applying incremental data to the dimension table, bringing it up to date with source-side changes

Solution overview

In a real-world scenario, records from source system tables are ingested on a periodic basis to an Amazon S3 location before being loaded into star schema tables in Amazon Redshift.

For this demonstration, data from two source tables, customer_master and customer_address, are combined to populate the target dimension table dim_customer, which is the customer dimension table.

The source tables customer_master and customer_address share the same primary key, customer_id, and will be joined on the same to fetch one record per customer_id along with attributes from both tables. row_audit_ts contains the latest timestamp at which the particular source record was inserted or last updated. This column helps identify the change records since the last data extraction.

rec_source_status is an optional column that indicates if the corresponding source record was inserted, updated, or deleted. This is applicable in cases where the source system itself provides the changes and populates rec_source_status appropriately.

The following figure provides the schema of the source and target tables.

Let’s look closer at the schema of the target table, dim_customer. It contains different categories of columns:

  • Keys – It contains two types of keys:
    • customer_sk is the primary key of this table. It is also called the surrogate key and has a unique value that is monotonically increasing.
    • customer_id is the source primary key and provides a reference back to the source system record.
  • SCD2 metadatarec_eff_dt and rec_exp_dt indicate the state of the record. These two columns together define the validity of the record. The value in rec_exp_dt will be set as ‘9999-12-31’ for presently active records.
  • Attributes – Includes first_name, last_name, employer_name, email_id, city, and country.

Data loading into a SCD table involves a first-time bulk data loading, referred to as the initial data load. This is followed by continuous or regular data loading, referred to as an incremental data load, to keep the records up to date with changes in the source tables.

To demonstrate the solution, we walk through the following steps for initial data load (1–7) and incremental data load (8–12):

  1. Land the source data files in an Amazon S3 location, using one subfolder per source table.
  2. Use an AWS Glue crawler to parse the data files and register tables in the AWS Glue Data Catalog.
  3. Create an external schema in Amazon Redshift to point to the AWS Glue database containing these tables.
  4. In Amazon Redshift, create one view per source table to fetch the latest version of the record for each primary key (customer_id) value.
  5. Create the dim_customer table in Amazon Redshift, which contains attributes from all relevant source tables.
  6. Create a view in Amazon Redshift joining the source table views from Step 4 to project the attributes modeled in the dimension table.
  7. Populate the initial data from the view created in Step 6 into the dim_customer table, generating customer_sk.
  8. Land the incremental data files for each source table in their respective Amazon S3 location.
  9. In Amazon Redshift, create a temporary table to accommodate the change-only records.
  10. Join the view from Step 6 and dim_customer and identify change records comparing the combined hash value of attributes. Populate the change records into the temporary table with an I, U, or D indicator.
  11. Update rec_exp_dt in dim_customer for all U and D records from the temporary table.
  12. Insert records into dim_customer, querying all I and U records from the temporary table.

Prerequisites

Before you get started, make sure you meet the following prerequisites:

Land data from source tables

Create separate subfolders for each source table in an S3 bucket and place the initial data files within the respective subfolder. In the following image, the initial data files for customer_master and customer_address are made available within two different subfolders. To try out the solution, you can use customer_master_with_ts.csv and customer_address_with_ts.csv as initial data files.

It’s important to include an audit timestamp (row_audit_ts) column that indicates when each record was inserted or last updated. As part of incremental data loading, rows with the same primary key value (customer_id) can arrive more than once. The row_audit_ts column helps identify the latest version of such records for a given customer_id to be used for further processing.

Register source tables in the AWS Glue Data Catalog

We use an AWS Glue crawler to infer metadata from delimited data files like the CSV files used in this post. For instructions on getting started with an AWS Glue crawler, refer to Tutorial: Adding an AWS Glue crawler.

Create an AWS Glue crawler and point it to the Amazon S3 location that contains the source table subfolders, within which the associated data files are placed. When you’re creating the AWS Glue crawler, create a new database named rs-dimension-blog. The following screenshots show the AWS Glue crawler configuration chosen for our data files.

Note that for the Set output and scheduling section, the advanced options are left unchanged.

Running this crawler should create the following tables within the rs-dimension-blog database:

  • customer_address
  • customer_master

Create schemas in Amazon Redshift

First, create an AWS Identity and Access Management (IAM) role named rs-dim-blog-spectrum-role. For instructions, refer to Create an IAM role for Amazon Redshift.

The IAM role has Amazon Redshift as the trusted entity, and the permissions policy includes AmazonS3ReadOnlyAccess and AWSGlueConsoleFullAccess, because we’re using the AWS Glue Data Catalog. Then associate the IAM role with the Amazon Redshift cluster or endpoint.

Instead, you can also set the IAM role as the default for your Amazon Redshift cluster or endpoint. If you do so, in the following create external schema command, pass the iam_role parameter as iam_role default.

Now, open Amazon Redshift Query Editor V2 and create an external schema passing the newly created IAM role and specifying the database as rs-dimension-blog. The database name rs-dimension-blog is the one created in the Data Catalog as part of configuring the crawler in the preceding section. See the following code:

create external schema spectrum_dim_blog 
from data catalog 
database 'rs-dimension-blog' 
iam_role 'arn:aws:iam::<accountid>:role/rs-dim-blog-spectrum-role';

Check if the tables registered in the Data Catalog in the preceding section are visible from within Amazon Redshift:

select * 
from spectrum_dim_blog.customer_master 
limit 10;

select * 
from spectrum_dim_blog.customer_address 
limit 10;

Each of these queries will return 10 rows from the respective Data Catalog tables.

Create another schema in Amazon Redshift to host the table, dim_customer:

create schema rs_dim_blog;

Create views to fetch the latest records from each source table

Create a view for the customer_master table, naming it vw_cust_mstr_latest:

create view rs_dim_blog.vw_cust_mstr_latest as with rows_numbered as (
  select 
    customer_id, 
    first_name, 
    last_name, 
    employer_name, 
    row_audit_ts, 
    row_number() over(
      partition by customer_id 
      order by 
        row_audit_ts desc
    ) as rnum 
  from 
    spectrum_dim_blog.customer_master
) 
select 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  row_audit_ts, 
  rnum 
from 
  rows_numbered 
where 
  rnum = 1 with no schema binding;

The preceding query uses row_number, which is a window function provided by Amazon Redshift. Using window functions enables you to create analytic business queries more efficiently. Window functions operate on a partition of a result set, and return a value for every row in that window. The row_number window function determines the ordinal number of the current row within a group of rows, counting from 1, based on the ORDER BY expression in the OVER clause. By including the PARTITION BY clause as customer_id, groups are created for each value of customer_id and ordinal numbers are reset for each group.

Create a view for the customer_address table, naming it vw_cust_addr_latest:

create view rs_dim_blog.vw_cust_addr_latest as with rows_numbered as (
  select 
    customer_id, 
    email_id, 
    city, 
    country, 
    row_audit_ts, 
    row_number() over(
      partition by customer_id 
      order by 
        row_audit_ts desc
    ) as rnum 
  from 
    spectrum_dim_blog.customer_address
) 
select 
  customer_id, 
  email_id, 
  city, 
  country, 
  row_audit_ts, 
  rnum 
from 
  rows_numbered 
where 
  rnum = 1 with no schema binding;

Both view definitions use the row_number window function of Amazon Redshift, ordering the records by descending order of the row_audit_ts column (the audit timestamp column). The condition rnum=1 fetches the latest record for each customer_id value.

Create the dim_customer table in Amazon Redshift

Create dim_customer as an internal table in Amazon Redshift within the rs_dim_blog schema. The dimension table includes the column customer_sk, that acts as the surrogate key column and enables us to capture a time-sensitive version of each customer record. The validity period for each record is defined by the columns rec_eff_dt and rec_exp_dt, representing record effective date and record expiry date, respectively. See the following code:

create table rs_dim_blog.dim_customer (
  customer_sk bigint, 
  customer_id bigint, 
  first_name varchar(100), 
  last_name varchar(100), 
  employer_name varchar(100), 
  email_id varchar(100), 
  city varchar(100), 
  country varchar(100), 
  rec_eff_dt date, 
  rec_exp_dt date
) diststyle auto;

Create a view to consolidate the latest version of source records

Create the view vw_dim_customer_src, which consolidates the latest records from both source tables using left outer join, keeping them ready to be populated into the Amazon Redshift dimension table. This view fetches data from the latest views defined in the section “Create views to fetch the latest records from each source table”:

create view rs_dim_blog.vw_dim_customer_src as 
select 
  m.customer_id, 
  m.first_name, 
  m.last_name, 
  m.employer_name, 
  a.email_id, 
  a.city, 
  a.country 
from 
  rs_dim_blog.vw_cust_mstr_latest as m 
  left join rs_dim_blog.vw_cust_addr_latest as a on m.customer_id = a.customer_id 
order by 
  m.customer_id with no schema binding;

At this point, this view fetches the initial data for loading into the dim_customer table that we are about to create. In your use-case, use a similar approach to create and join the required source table views to populate your target dimension table.

Populate initial data into dim_customer

Populate the initial data into the dim_customer table by querying the view vw_dim_customer_src. Because this is the initial data load, running row numbers generated by the row_number window function will suffice to populate a unique value in the customer_sk column starting from 1:

insert into rs_dim_blog.dim_customer 
select 
  row_number() over() as customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  cast('2022-07-01' as date) rec_eff_dt, 
  cast('9999-12-31' as date) rec_exp_dt 
from 
  rs_dim_blog.vw_dim_customer_src;

In this query, we have specified ’2022-07-01’ as the value in rec_eff_dt for all initial data records. For your use-case, you can modify this date value as appropriate to your situation.

The preceding steps complete the initial data loading into the dim_customer table. In the next steps, we proceed with populating incremental data.

Land ongoing change data files in Amazon S3

After the initial load, the source systems provide data files on an ongoing basis, either containing only new and change records or a full extract containing all records for a particular table.

You can use the sample files customer_master_with_ts_incr.csv and customer_address_with_ts_incr.csv, which contain changed as well as new records. These incremental files need to be placed in the same location in Amazon S3 where the initial data files were placed. Please see section “Land data from source tables”. This will result in the corresponding Redshift Spectrum tables automatically reading the additional rows.

If you used the sample file for customer_master, after adding the incremental files, the following query shows the initial as well as incremental records:

select 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  row_audit_ts 
from 
  spectrum_dim_blog.customer_master 
order by 
  customer_id;

In case of full extracts, we can identify deletes occurring in the source system tables by comparing the previous and current versions and looking for missing records. In case of change-only extracts where the rec_source_status column is present, its value will help us identify deleted records. In either case, land the ongoing change data files in the respective Amazon S3 locations.

For this example, we have uploaded the incremental data for the customer_master and customer_address source tables with a few customer_id records receiving updates and a few new records being added.

Create a temporary table to capture change records

Create the temporary table temp_dim_customer to store all changes that need to be applied to the target dim_customer table:

create temp table temp_dim_customer (
  customer_sk bigint, 
  customer_id bigint, 
  first_name varchar(100), 
  last_name varchar(100), 
  employer_name varchar(100), 
  email_id varchar(100), 
  city varchar(100), 
  country varchar(100), 
  rec_eff_dt date, 
  rec_exp_dt date, 
  iud_operation character(1)
);

Populate the temporary table with new and changed records

This is a multi-step process that can be combined into a single complex SQL. Complete the following steps:

  1. Fetch the latest version of all customer attributes by querying the view vw_dim_customer_src:
select 
  customer_id, 
  sha2(
    coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
  ) as hash_value, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  current_date rec_eff_dt, 
  cast('9999-12-31' as date) rec_exp_dt 
from 
  rs_dim_blog.vw_dim_customer_src;

Amazon Redshift offers hashing functions such as sha2, which converts a variable length string input into a fixed length character output. The output string is a text representation of the hexadecimal value of the checksum with the specified number of bits. In this case, we pass a concatenated set of customer attributes whose change we want to track, specifying the number of bits as 512. We’ll use the output of the hash function to determine if any of the attributes have undergone a change. This dataset will be called newver (new version).

Because we landed the ongoing change data in the same location as the initial data files, the records retrieved from the preceding query (in newver) include all records, even the unchanged ones. But because of the definition of the view vw_dim_customer_src, we get only one record per customerid, which is its latest version based on row_audit_ts.

  1. In a similar manner, retrieve the latest version of all customer records from dim_customer, which are identified by rec_exp_dt=‘9999-12-31’. While doing so, also retrieve the sha2 value of all customer attributes available in dim_customer:
select 
  customer_id, 
  sha2(
    coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
  ) as hash_value, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country 
from 
  rs_dim_blog.dim_customer 
where 
  rec_exp_dt = '9999-12-31';

This dataset will be called oldver (old or existing version).

  1. Identify the current maximum surrogate key value from the dim_customer table:
select 
  max(customer_sk) as maxval 
from 
  rs_dim_blog.dim_customer;

This value (maxval) will be added to the row_number before being used as the customer_sk value for the change records that need to be inserted.

  1. Perform a full outer join of the old version of records (oldver) and the new version (newver) of records on the customer_id column. Then compare the old and new hash values generated by the sha2 function to determine if the change record is an insert, update, or delete:
case when oldver.customer_id is null then 'I'
when newver.customer_id is null then 'D'
when oldver.hash_value != newver.hash_value then 'U'
else 'N' end as iud_op

We tag the records as follows:

  • If the customer_id is non-existent in the oldver dataset (oldver.customer_id is null), it’s tagged as an insert (‘I').
  • Otherwise, if the customer_id is non-existent in the newver dataset (newver.customer_id is null), it’s tagged as a delete (‘D').
  • Otherwise, if the old hash_value and new hash_value are different, these records represent an update (‘U').
  • Otherwise, it indicates that the record has not undergone any change and therefore can be ignored or marked as not-to-be-processed (‘N').

Make sure to modify the preceding logic if the source extract contains rec_source_status to identify deleted records.

Although sha2 output maps a possibly infinite set of input strings to a finite set of output strings, the chances of collision of hash values for the original row values and changed row values are very unlikely. Instead of individually comparing each column value before and after, we compare the hash values generated by sha2 to conclude if there has been a change in any of the attributes of the customer record. For your use-case, we recommend you choose a hash function that works for your data conditions after adequate testing. Instead, you can compare individual column values if none of the hash functions satisfactorily meet your expectations.

  1. Combining the outputs from the preceding steps, let’s create the INSERT statement that captures only change records to populate the temporary table:
insert into temp_dim_customer (
  customer_sk, customer_id, first_name, 
  last_name, employer_name, email_id, 
  city, country, rec_eff_dt, rec_exp_dt, 
  iud_operation
) with newver as (
  select 
    customer_id, 
    sha2(
      coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
    ) as hash_value, 
    first_name, 
    last_name, 
    employer_name, 
    email_id, 
    city, 
    country, 
    current_date rec_eff_dt, 
    cast('9999-12-31' as date) rec_exp_dt 
  from 
    rs_dim_blog.vw_dim_customer_src
), 
oldver as (
  select 
    customer_id, 
    sha2(
      coalesce(first_name, '') || coalesce(last_name, '') || coalesce(employer_name, '') || coalesce(email_id, '') || coalesce(city, '') || coalesce(country, ''), 512
    ) as hash_value, 
    first_name, 
    last_name, 
    employer_name, 
    email_id, 
    city, 
    country 
  from 
    rs_dim_blog.dim_customer 
  where 
    rec_exp_dt = '9999-12-31'
), 
maxsk as (
  select 
    max(customer_sk) as maxval 
  from 
    rs_dim_blog.dim_customer
), 
allrecs as (
  select 
    coalesce(oldver.customer_id, newver.customer_id) as customer_id, 
    case when oldver.customer_id is null then 'I' when newver.customer_id is null then 'D' when oldver.hash_value != newver.hash_value then 'U' else 'N' end as iud_op, 
    newver.first_name, 
    newver.last_name, 
    newver.employer_name, 
    newver.email_id, 
    newver.city, 
    newver.country, 
    newver.rec_eff_dt, 
    newver.rec_exp_dt 
  from 
    oldver full 
    outer join newver on oldver.customer_id = newver.customer_id
) 
select 
  (maxval + (row_number() over())) as customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  rec_eff_dt, 
  rec_exp_dt, 
  iud_op 
from 
  allrecs, 
  maxsk 
where 
  iud_op != 'N';

Expire updated customer records

With the temp_dim_customer table now containing only the change records (either ‘I’, ‘U’, or ‘D’), the same can be applied on the target dim_customer table.

Let’s first fetch all records with values ‘U’ or ‘D’ in the iud_op column. These are records that have either been deleted or updated in the source system. Because dim_customer is a slowly changing dimension, it needs to reflect the validity period of each customer record. In this case, we expire the presently active recorts that have been updated or deleted. We expire these records as of yesterday (by setting rec_exp_dt=current_date-1) matching on the customer_id column:

update 
  rs_dim_blog.dim_customer 
set 
  rec_exp_dt = current_date - 1 
where 
  customer_id in (
    select 
      customer_id 
    from 
      temp_dim_customer as t 
    where 
      iud_operation in ('U', 'D')
  ) 
  and rec_exp_dt = '9999-12-31';

Insert new and changed records

As the last step, we need to insert the newer version of updated records along with all first-time inserts. These are indicated by ‘U’ and ‘I’, respectively, in the iud_op column in the temp_dim_customer table:

insert into rs_dim_blog.dim_customer (
  customer_sk, customer_id, first_name, 
  last_name, employer_name, email_id, 
  city, country, rec_eff_dt, rec_exp_dt
) 
select 
  customer_sk, 
  customer_id, 
  first_name, 
  last_name, 
  employer_name, 
  email_id, 
  city, 
  country, 
  rec_eff_dt, 
  rec_exp_dt 
from 
  temp_dim_customer 
where 
  iud_operation in ('I', 'U');

Depending on the SQL client setting, you might want to run a commit transaction; command to verify that the preceding changes are persisted successfully in Amazon Redshift.

Check the final output

You can run the following query and see that the dim_customer table now contains both the initial data records plus the incremental data records, capturing multiple versions for those customer_id values that got changed as part of incremental data loading. The output also indicates that each record has been populated with appropriate values in rec_eff_dt and rec_exp_dt corresponding to the record validity period.

select 
  * 
from 
  rs_dim_blog.dim_customer 
order by 
  customer_id, 
  customer_sk;

For the sample data files provided in this article, the preceding query returns the following records. If you’re using the sample data files provided in this post, note that the values in customer_sk may not match with what is shown in the following table.

In this post, we only show the important SQL statements; the complete SQL code is available in load_scd2_sample_dim_customer.sql.

Clean up

If you no longer need the resources you created, you can delete them to prevent incurring additional charges.

Conclusion

In this post, you learned how to simplify data loading into Type-2 SCD tables in Amazon Redshift, covering both initial data loading and incremental data loading. The approach deals with multiple source tables populating a target dimension table, capturing the latest version of source records as of each run.

Refer to Amazon Redshift data loading best practices for further materials and additional best practices, and see Updating and inserting new data for instructions to implement updates and inserts.


About the Author

Vaidy Kalpathy is a Senior Data Lab Solution Architect at AWS, where he helps customers modernize their data platform and defines end to end data strategy including data ingestion, transformation, security, visualization. He is passionate about working backwards from business use cases, creating scalable and custom fit architectures to help customers innovate using data analytics services on AWS.

Build an end-to-end change data capture with Amazon MSK Connect and AWS Glue Schema Registry

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/

The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ 
    “Database Name”: “sampledatabase”, 
    “Table Name”: “movies”, 
    “Fields”: [
         { 
            “name”: “movie_id”, 
            “type”: “INTEGER” 
         },
         { 
            “name”: “title”, 
            “type”: “STRING” 
         },
         { 
            “name”: “release_year”,
            “type”: “INTEGER” 
         }
     ] 
}

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false
  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql
  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql
  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql
  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{
  "type": "record",
  "name": "Value",
  "namespace": "db1.sampledatabase.movies",
  "fields": [
    {
      "name": "movie_id",
      "type": "int"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "COUNTRY",
      "type": "string"
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "db1.sampledatabase.movies.Value"
}
  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.