Tag Archives: AWS Glue

Use AWS Glue Data Catalog views to analyze data

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/use-aws-glue-data-catalog-views-to-analyze-data/

In this post, we show you how to use the new views feature the AWS Glue Data Catalog. SQL views are a powerful object used across relational databases. You can use views to decrease the time to insights of data by tailoring the data that is queried. Additionally, you can use the power of SQL in a view to express complex boundaries in data across multiple tables that can’t be expressed with simpler permissions. Data lakes provide customers the flexibility required to derive useful insights from data across many sources and many use cases. Data consumers can consume data where they need to across lines of business, increasing the velocity of insights generation.

Customers use many different processing engines in their data lakes, each of which have their own version of views with different capabilities. The AWS Glue Data Catalog and AWS Lake Formation provide a central location to manage your data across data lake engines.

AWS Glue has released a new feature, SQL views, which allows you to manage a single view object in the Data Catalog that can be queried from SQL engines. You can create a single view object with a different SQL version for each engine you want to query, such as Amazon Athena, Amazon Redshift, and Spark SQL on Amazon EMR. You can then manage access to these resources using the same Lake Formation permissions that are used to control tables in the data lake.

Solution overview

For this post, we use the Women’s E-Commerce Clothing Review. The objective is to create views in the Data Catalog so you can create a single common view schema and metadata object to use across engines (in this case, Athena). Doing so lets you use the same views across your data lakes to fit your use case. We create a view to mask the customer_id column in this dataset, then we will share this view to another user so that they can query this masked view.

Prerequisites

Before you can create a view in the AWS Glue Data Catalog, make sure that you have an AWS Identity and Access Management (IAM) role with the following configuration:

  • The following trust policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
               "glue.amazonaws.com",
               "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

  • The following pass role policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Stmt1",
          "Action": [
            "iam:PassRole"
          ],
          "Effect": "Allow",
          "Resource": "*",
          "Condition": {
             "StringEquals": {
               "iam:PassedToService": [
                 "glue.amazonaws.com",
                 "lakeformation.amazonaws.com"
               ]
             }
           }
         }
       ]
    }

  • Finally, you will also need the following permissions:
    • "Glue:GetDatabase",
    • "Glue:GetDatabases",
    • "Glue:CreateTable",
    • "Glue:GetTable",
    • "Glue:UpdateTable",
    • "Glue:DeleteTable",
    • "Glue:GetTables",
    • "Glue:SearchTables",
    • "Glue:BatchGetPartition",
    • "Glue:GetPartitions",
    • "Glue:GetPartition",
    • "Glue:GetTableVersion",
    • "Glue:GetTableVersions"

Run the AWS CloudFormation template

You can deploy the AWS CloudFormation template glueviewsblog.yaml to create the Lake Formation database and table. The dataset will be loaded into an Amazon Simple Storage Service (Amazon S3) bucket.

For step-by-step instructions, refer to Creating a stack on the AWS CloudFormation console.

When the stack is complete, you can see a table called clothing_parquet on the Lake Formation console, as shown in the following screenshot.

Create a view on the Athena console

Now that you have your Lake Formation managed table, you can open the Athena console and create a Data Catalog view. Complete the following steps:

  1. In the Athena query editor, run the following query on the Parquet dataset:
SELECT * FROM "clothing_reviews"."clothing_parquet" limit 10;

In the query results, the customer_id column is currently visible.

Next, you create a view called hidden_customerID and mask the customer_id column.

  1. Create a view called hidden_customerID:
CREATE PROTECTED MULTI DIALECT VIEW clothing_reviews.hidden_customerid SECURITY DEFINER AS 
SELECT * FROM clothing_reviews.clothing_parquet

In the following screenshot, you can see a view called hidden_customerID was successfully created.

  1. Run the following query to mask the first four characters of the customer_id column for the newly generated view:
ALTER VIEW clothing_reviews.hidden_customerid UPDATE DIALECT AS
SELECT '****' || substring(customer_id, 4) as customer_id,clothing_id,age,title,review_text,rating,recommend_ind,positive_feedback,division_name,department_name,class_name 
FROM clothing_reviews.clothing_parquet

You can see in the following screenshot that the view hidden_customerID has the customer_id column’s first four characters masked.

The original table clothing_parquet remains the same unmasked.

Grant access of the view to another user to query

Data Catalog views allow you to use Lake Formation to control access. In this step, you grant this view to another user called amazon_business_analyst and then query from that user.

  1. Sign in to the Lake Formation console as admin.
  2. In the navigation pane, choose Views.

As shown in the following screenshot, you can see the hidden_customerid view.

  1. Sign in as the amazon_business_analyst user and navigate to the Views page.

This user has no visibility to the view.

  1. Grant permission to the amazon_business_analyst user from the data lake admin.
  1. Sign in again as amazon_business_analyst and navigate to the Views page.

  1. On the Athena console, query the hidden_customerid view.

You have successfully shared a view to the user and queried it from the Athena console.

Clean up

To avoid incurring future charges, delete the CloudFormation stack. For instructions, refer to Deleting a stack on the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to use the AWS Glue Data Catalog to create views. We then showed how to alter the views and mask the data. You can share the view with different users to query using Athena. For more information about this new feature, refer to Using AWS Glue Data Catalog views.


About the Authors

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Michael Chess – is a Product Manager on the AWS Lake Formation team based out of Palo Alto, CA. He specializes in permissions and data catalog features in the data lake.

Derek Liu – is a Senior Solutions Architect based out of Vancouver, BC. He enjoys helping customers solve big data challenges through AWS analytic services.

Detect and handle data skew on AWS Glue

Post Syndicated from Salim Tutuncu original https://aws.amazon.com/blogs/big-data/detect-and-handle-data-skew-on-aws-glue/

AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray).

Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns.

For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution.

Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew.

How to detect data skew

When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI.

The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance.

You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew.

Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew.

These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew.

AWS Glue interactive sessions

You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew:

from pyspark.sql.functions import spark_partition_id, asc, desc
#input_dataframe being the dataframe where you want to check for data skew
partition_sizes_df=input_dataframe\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .withColumnRenamed("count","partition_size")
#calculate average and standar deviation for the partition sizes
avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0]
std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0]

""" 
 the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size).
 then, calculates twice the standard deviation (std_dev_size) and use 
 that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation
 in order to mark a partition 'skewed'
"""
skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size)
if skewed_partitions_df.count() > 0:
    skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()]
    print(f"The following partitions have significantly different sizes: {skewed_partitions}")
else:
    print("No data skew detected.")

You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected.

This code assumes that your data is structured, and you may need to modify it if your data is of a different type.

How to handle data skew

You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present.

The following are some of the techniques that you can employ to handle data skew:

  • Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately.
  • Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data.
  • Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition.
  • Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes.
  • Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values.

These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing.

The following is an example of joining skewed data with the salting technique:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, ceil, rand, concat, col

# Define the number of salt values
num_salts = 3

# Function to identify skewed keys
def identify_skewed_keys(df, key_column, threshold):
    key_counts = df.groupBy(key_column).count()
    return key_counts.filter(key_counts['count'] > threshold).select(key_column)

# Identify skewed keys
skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold)

# Splitting the dataset
skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner")
non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti")

# Apply salting to skewed data
skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts))
skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

# Replicate skewed rows in non-skewed dataset
def replicate_skewed_rows(df, keys, multiplier):
    replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt"))
    replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    return replicated_df.drop("salt")

replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts)

# Perform the JOIN operation on the salted keys for skewed data
result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key")

# Perform regular join on non-skewed data
result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key")

# Combine results
final_result = result_skewed.union(result_non_skewed)

In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew.

The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed.

The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values.

Other techniques to use on skewed data during the join operation

When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data.

If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark.

Conclusion

This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark.

Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark.

As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section.


About the Authors

Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI.

How Fujitsu implemented a global data mesh architecture and democratized data

Post Syndicated from Kanehito Miyake original https://aws.amazon.com/blogs/big-data/how-fujitsu-implemented-a-global-data-mesh-architecture-and-democratized-data/

This is a guest post co-authored with Kanehito Miyake, Engineer at Fujitsu Japan. 

Fujitsu Limited was established in Japan in 1935. Currently, we have approximately 120,000 employees worldwide (as of March 2023), including group companies. We develop business in various regions around the world, starting with Japan, and provide digital services globally. To provide a variety of products, services, and solutions that are better suited to customers and society in each region, we have built business processes and systems that are optimized for each region and its market.

However, in recent years, the IT market environment has changed drastically, and it has become difficult for the entire group to respond flexibly to the individual market situation. Moreover, we are challenged not only to revisit individual products, services, and solutions, but also to reinvent entire business processes and operations.

To transform Fujitsu from an IT company to a digital transformation (DX) company, and to become a world-leading DX partner, Fujitsu has declared a shift to data-driven management. We built the OneFujitsu program, which standardizes business projects and systems throughout the company, including the domestic and overseas group companies, and tackles the major transformation of the entire company under the program.

To achieve data-driven management, we built OneData, a data utilization platform used in the four global AWS Regions, which started operation in April 2022. As of November 2023, more than 200 projects and 37,000 users were onboarded. The platform consists of approximately 370 dashboards, 360 tables registered in the data catalog, and 40 linked systems. The data size stored in Amazon Simple Storage Service (Amazon S3) exceeds 100 TB, including data processed for use in each project.

In this post, we introduce our OneData initiative. We explain how Fujitsu worked to solve the aforementioned issues and introduce an overview of the OneData design concept and its implementation. We hope this post will provide some guidance for architects and engineers.

Challenges

Like many other companies struggling with data utilization, Fujitsu faced some challenges, which we discuss in this section.

Siloed data

In Fujitsu’s long history, we restructured organizations by merging affiliated companies into Fujitsu. Although organizational integration has progressed, there are still many systems and mechanisms customized for individual context. There are also many systems and mechanisms overlapping across different organizations. For this reason, it takes a lot of time and effort to discover, search, and integrate data when analyzing the entire company using a common standard. This situation makes it difficult for management to grasp business trends and make decisions in a timely manner.

Under these circumstances, the OneFujitsu program is designed have one system per one business globally. Core systems such as ERP and CRM are being integrated and unified in order to not have silos. It will make it easier for users to utilize data across different organizations for specific business areas.

However, to spread a culture of data-driven decision-making not only in management but also in every organization, it is necessary to have a mechanism that enables users to easily discover various types of data in organizations, and then analyze the data quickly and flexibly when needed.

Excel-based data utilization

Microsoft Excel is available on almost everyone’s PC in the company, and it helps lower the hurdles when starting to utilize data. However, Excel is mainly designed for spreadsheets; it’s not designed for large-scale data analytics and automation. Excel files tend to contain a mixture of data and procedures (functions, macros), and many users casually copy files for one-time use cases. It introduces complexity to keep both data and procedures up to date. Furthermore, it tends to require domain-specific knowledge to manage the Excel files for individual context.

For those reasons, it was extremely difficult for Fujitsu to manage and utilize data at scale with Excel.

Solution overview

OneData defines three personas:

  • Publisher – This role includes the organizational and management team of systems that serve as data sources. Responsibilities include:
    • Load raw data from the data source system at the appropriate frequency.
    • Provide and keep up to date with technical metadata for loaded data.
    • Perform the cleansing process and format conversion of raw data as needed.
    • Grant access permissions to data based on the requests from data users.
  • Consumer – Consumers are organizations and projects that use the data. Responsibilities include:
    • Look for the data to be used from the technical data catalog and request access to the data.
    • Handle the process and conversion of data into a format suitable for their own use (such as fact-dimension) with granted referencing permissions.
    • Configure business intelligence (BI) dashboards to provide data-driven insights to end-users targeted by the consumer’s project.
    • Use the latest data published by the publisher to update data as needed.
    • Promote and expand the use of databases.
  • Foundation – This role encompasses the data steward and governance team. Responsibilities include:
    • Provide a preprocessed, generic dataset of data commonly used by many consumers.
    • Manage and guide metrics for the quality of data published by each publisher.

Each role has sub-roles. For example, the consumer role has the following sub-roles with different responsibilities:

  • Data engineer – Create data process for analysis
  • Dashboard developer – Create a BI dashboard
  • Dashboard viewer – Monitor the BI dashboard

The following diagram describes how OneData platform works with those roles.

Let’s look at the key components of this architecture in more detail.

Publisher and consumer

In the OneData platform, the publisher is per each data source system, and the consumer is defined per each data utilization project. OneData provides an AWS account for each.

This enables the publisher to cleanse data and the consumer to process and analyze data at scale. In addition, by properly separating data and processing, it becomes effortless for the teams and organizations to share, manage, and inherit processes that were traditionally confined to individual PCs.

Foundation

When the teams don’t have a robust enough skillset, it can require more time to model and process data, and cause longer latency and lower data quality. It can also contribute to lower utilization by end-users. To address this, the foundation role provides an already processed dataset as a generic data model for data commonly use cases used by many consumers. This enables high-quality data available to each consumer. Here, the foundation role takes the lead in compiling the knowledge of domain experts and making data suitable for analysis. It is also an effective approach that eliminates duplicates for consumers. In addition, the foundation role monitors the state of the metadata, data quality indicators, data permissions, information classification labels, and so on. It is crucial in data governance and data management.

BI and visualization

Individual consumers have a dedicated space in a BI tool. In the past, if users wanted to go beyond simple data visualization using Excel, they had to build and maintain their own BI tools, which caused silos. By unifying these BI tools, OneData lowers the difficulty for consumers to use BI tools, and centralizes operation and maintenance, achieving optimization on a company-wide scale.

Additionally, to keep portability between BI tools, OneData recommends users transform data within the consumer AWS account instead of transforming data in the BI tool. With this approach, BI tool loads data from AWS Glue Data Catalog tables through an Amazon Athena JDBC/ODBC driver without any further transformations.

Deployment and operational excellence

To provide OneData as a common service for Fujitsu and group companies around the world, Regional OneData has been deployed in several locations. Regional OneData represents a unit of system configurations, and is designed to provide lower network latency for platform users, and be optimized for local languages, working hours for system operations and support, and region-specific legal restrictions, such as data residency and personal information protection.

The Regional Operations Unit (ROU), a virtual organization that brings together members from each region, is responsible for operating regional OneData in each of these regions. OneData HQ is responsible for supervising these ROUs, as well as planning and managing the entire OneData.

In addition, we have a specially positioned OneData called Global OneData, where global data utilization spans each region. Only the properly cleansed and sanitized data is transferred between each Regional OneData and Global OneData.

Systems such as ERP and CRM are accumulating data as a publisher for Global OneData, and the dashboards for executives in various regions to monitor business conditions with global metrics are also acting as a consumer for Global OneData.

Technical concepts

In this section, we discuss some of the technical concepts of the solution.

Large scale multi-account

We have adopted a multi-account strategy to provide AWS accounts for each project. Many publishers and consumers are already onboarded into OneData, and the number is expected to increase in the future. With this strategy, future usage expansion at scale can be achieved without affecting the users.

Also, this strategy allowed us to have clear boundaries in security, costs, and service quotas for each AWS service.

All the AWS accounts are deployed and managed through AWS Organizations and AWS Control Tower.

Serverless

Although we provide independent AWS accounts for each publisher and consumer, both operational costs and resource costs would be enormous if we accommodated individual user requests, such as, “I want a virtual machine or RDBMS to run specific tools for data processing.” To avoid such continuous operational and resource costs, we have adopted AWS serverless services for all the computing resources necessary for our activities as a publisher and consumer.

We use AWS Glue to preprocess, cleanse, and enrich data. Optionally, AWS Lambda or Amazon Elastic Container Service (Amazon ECS) with AWS Fargate can also be used based on preferences. We allow users to set up AWS Step Functions for orchestration and Amazon CloudWatch for monitoring. In addition, we provide Amazon Aurora Serverless PostgreSQL as standard for consumers, to meet their needs for data processing with extract, load, and transform (ELT) jobs. With this approach, only the consumer who requires those services will incur charges based on usage. We are able to take advantage of lower operational and resource costs thanks to the unique benefit of serverless (or more accurately, pay-as-you-go) services.

AWS provides many serverless services, and OneData has integrated them to provide scalability that allows active users to quickly provide the required capability as needed, while minimizing the cost for non-frequent users.

Data ownership and access control

In OneData, we have adopted a data mesh architecture where each publisher maintains ownership of data in a distributed and decentralized manner. When the consumer discovers the data they want to use, they request access from the publisher. The publisher accepts the request and grants permissions only when the request meets their own criteria. With the AWS Glue Data Catalog and AWS Lake Formation, there is no need to update S3 bucket policies or AWS Identity and Access Management (IAM) policies every time we allow access for individual data on an S3 data lake, and we can effortlessly grant the necessary permissions for the databases, tables, columns, and rows when needed.

Conclusion

Since the launch of OneData in April 2022, we have been persistently carrying out educational activities to expand the number of users and introducing success stories on our portal site. As a result, we have been promoting change management within the company and are actively utilizing data in each department. Regional OneData is being rolled out gradually, and we plan to further expand the scale of use in the future.

With its global expansion, the development of basic functions as a data utilization platform will reach a milestone. As we move forward, it will be important to make sure that OneData platform is used effectively throughout Fujitsu, while incorporating new technologies related to data analysis as appropriate. For example, we are preparing to provide more advanced machine learning functions using Amazon SageMaker Studio with OneData users and investigating the applicability of AWS Glue Data Quality to reduce the manual quality monitoring efforts. Furthermore, we are currently in the process of implementing Amazon DataZone through various initiatives and efforts, such as verifying its functionality and examining how it can operate while bridging the gap between OneData’s existing processes and to the ideal process we are aiming for ideals.

We have had the opportunity to discuss data utilization with various partners and customers and although individual challenges may differ in size and its context, the issues that we are currently trying to solve with OneData are common to many of them.

This post describes only a small portion of how Fujitsu tackled challenges using the AWS Cloud, but we hope the post will give you some inspiration to solve your own challenges.


About the Author


Kanehito Miyake is an engineer at Fujitsu Japan and in charge of OneData’s solution and cloud architecture. He spearheaded the architectural study of the OneData project and contributed greatly to promoting data utilization at Fujitsu with his expertise. He loves rockfish fishing.

Junpei Ozono is a Go-to-market Data & AI solutions architect at AWS in Japan. Junpei supports customers’ journeys on the AWS Cloud from Data & AI aspects and guides them to design and develop data-driven architectures powered by AWS services.

Introducing Amazon Q data integration in AWS Glue

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-amazon-q-data-integration-in-aws-glue/

Today, we’re excited to announce general availability of Amazon Q data integration in AWS Glue. Amazon Q data integration, a new generative AI-powered capability of Amazon Q Developer, enables you to build data integration pipelines using natural language. This reduces the time and effort you need to learn, build, and run data integration jobs using AWS Glue data integration engines.

Tell Amazon Q Developer what you need in English, it will return a complete job for you. For example, you can ask Amazon Q Developer to generate a complete extract, transform, and load (ETL) script or code snippet for individual ETL operations. You can troubleshoot your jobs by asking Amazon Q Developer to explain errors and propose solutions. Amazon Q Developer provides detailed guidance throughout the entire data integration workflow. Amazon Q Developer helps you learn and build data integration jobs using AWS Glue efficiently by generating the required AWS Glue code based on your natural language descriptions. You can create jobs that extract, transform, and load data that is stored in Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon DynamoDB. Amazon Q Developer can also help you connect to third-party, software as a service (SaaS), and custom sources.

With general availability, we added new capabilities for you to author jobs using natural language. Amazon Q Developer can now generate complex data integration jobs with multiple sources, destinations, and data transformations. It can generate data integration jobs for extracts and loads to S3 data lakes including file formats like CSV, JSON, and Parquet, and ingestion into open table formats like Apache Hudi, Delta, and Apache Iceberg. It generates jobs for connecting to over 20 data sources, including relational databases like PostgreSQL, MySQL and Oracle; data warehouses like Amazon Redshift, Snowflake, and Google BigQuery; NoSQL databases like DynamoDB, MongoDB and OpenSearch; tables defined in the AWS Glue Data Catalog; and custom user-supplied JDBC and Spark connectors. Generated jobs can use a variety of data transformations, including filter, project, union, join, and custom user-supplied SQL.

Amazon Q data integration in AWS Glue helps you through two different experiences: the Amazon Q chat experience, and AWS Glue Studio notebook experience. This post describes the end-to-end user experiences to demonstrate how Amazon Q data integration in AWS Glue simplifies your data integration and data engineering tasks.

Amazon Q chat experience

Amazon Q Developer provides a conversational Q&A capability and a code generation capability for data integration. To start using the conversational Q&A capability, choose the Amazon Q icon on the right side of the AWS Management Console.

For example, you can ask, “How do I use AWS Glue for my ETL workloads?” and Amazon Q provides concise explanations along with references you can use to follow up on your questions and validate the guidance.

To start using the AWS Glue code generation capability, use the same window. On the AWS Glue console, start authoring a new job, and ask Amazon Q, “Please provide a Glue script that reads from Snowflake, renames the fields, and writes to Redshift.”

You will notice that the code is generated. With this response, you can learn and understand how you can author AWS Glue code for your purpose. You can copy/paste the generated code to the script editor and configure placeholders. After you configure an AWS Identity and Access Management (IAM) role and AWS Glue connections on the job, save and run the job. When the job is complete, you can start querying the table exported from Snowflake in Amazon Redshift.

Let’s try another prompt that reads data from two different sources, filters and projects them individually, joins on a common key, and writes the output to a third target.  Ask Amazon Q: “I want to read data from S3 in Parquet format, and select some fields. I also want to read data from DynamoDB, select some fields, and filter some rows. I want to union these two datasets and write the results to OpenSearch.

The code is generated. When the job is complete, your index is available in OpenSearch and can be used by your downstream workloads.

AWS Glue Studio notebook experience

Amazon Q data integration in AWS Glue helps you author code in an AWS Glue notebook to speed up development of new data integration applications. In this section, we walk you through how to set up the notebook and run a notebook job.

Prerequisites

Before going forward with this tutorial, complete the following prerequisites:

  1. Set up AWS Glue Studio.
  2. Configure an IAM role to interact with Amazon Q. Attach the following policy to your IAM role for the AWS Glue Studio notebook:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CodeWhispererPermissions",
                "Effect": "Allow",
                "Action": [
                    "codewhisperer:GenerateRecommendations"
                ],
                "Resource": "*"
            }
        ]
    }

Create a new AWS Glue Studio notebook job

Create a new AWS Glue Studio notebook job by completing the following steps:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.
  3. For Engine, select Spark (Python).
  4. For Options, select Start fresh.
  5. For IAM role, choose the IAM role you configured as a prerequisite.
  6. Choose Create notebook.

A new notebook is created with sample cells. Let’s try recommendations using the Amazon Q data integration in AWS Glue to auto-generate code based on your intent. Amazon Q would help you with each step as you express an intent in a Notebook cell.

Add a new cell and enter your comment to describe what you want to achieve. After you press Tab and Enter, the recommended code is shown. First intent is to extract the data: “Give me code that reads a Glue Data Catalog table”, followed by “Give me code to apply a filter transform with star_rating>3” and “Give me code that writes the frame into S3 as Parquet”.

Similar to the Amazon Q chat experience, the code is recommended. If you press Tab, then the recommended code is chosen. You can learn more in User actions.

You can run each cell by simply filling in the appropriate options for your sources in the generated code. At any point in the runs, you can also preview a sample of your dataset by simply using the show() method.

Let’s now try to generate a full script with a single complex prompt. “I have JSON data in S3 and data in Oracle that needs combining. Please provide a Glue script that reads from both sources, does a join, and then writes results to Redshift”

You may notice that, on the notebook, the Amazon Q data integration in AWS Glue generated the same code snippet that was generated in the Amazon Q chat.

You can also run the notebook as a job, either by choosing Run or programmatically.

Conclusion

With Amazon Q data integration, you have an artificial intelligence (AI) expert by your side to integrate data efficiently without deep data engineering expertise. These capabilities simplify and accelerate data processing and integration on AWS. Amazon Q data integration in AWS Glue is available in every AWS Region where Amazon Q is available. To learn more, visit the product page, our documentation, and the Amazon Q pricing page.

A special thanks to everyone who contributed to the launch of Amazon Q data integration in AWS Glue: Alexandra Tello, Divya Gaitonde, Andrew Kim, Andrew King, Anshul Sharma, Anshi Shrivastava, Chuhan Liu, Daniel Obi, Hirva Patel, Henry Caballero Corzo, Jake Zych, Jeremy Samuel, Jessica Cheng, , Keerthi Chadalavada, Layth Yassin, Maheedhar Reddy Chappidi, Maya Patwardhan, Neil Gupta, Raghavendhar Vidyasagar Thiruvoipadi, Rajendra Gujja, Rupak Ravi, Shaoying Dong, Vaibhav Naik, Wei Tang, William Jones, Daiyan Alamgir, Japson Jeyasekaran, Matt Sampson, Kartik Panjabi, Ranu Shah, Chuan Lei, Huzefa Rangwala, Jiani Zhang, Xiao Qin, Mukul Prasad, Alon Halevy, Brian Ross, Alona Nadler, Omer Zaki, Rick Sears, Bratin Saha, G2 Krishnamoorthy, Kinshuk Pahare, Nitin Bahadur, and Santosh Chandrachood.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.


Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends.


Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.


XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.


Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS.


Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

Optimize data layout by bucketing with Amazon Athena and AWS Glue to accelerate downstream queries

Post Syndicated from Takeshi Nakatani original https://aws.amazon.com/blogs/big-data/optimize-data-layout-by-bucketing-with-amazon-athena-and-aws-glue-to-accelerate-downstream-queries/

In the era of data, organizations are increasingly using data lakes to store and analyze vast amounts of structured and unstructured data. Data lakes provide a centralized repository for data from various sources, enabling organizations to unlock valuable insights and drive data-driven decision-making. However, as data volumes continue to grow, optimizing data layout and organization becomes crucial for efficient querying and analysis.

One of the key challenges in data lakes is the potential for slow query performance, especially when dealing with large datasets. This can be attributed to factors such as inefficient data layout, resulting in excessive data scanning and inefficient use of compute resources. To address this challenge, common practices like partitioning and bucketing can significantly improve query performance and reduce computation costs.

Partitioning is a technique that divides a large dataset into smaller, more manageable parts based on specific criteria, such as date, region, or product category. By partitioning data, downstream analytical queries can skip irrelevant partitions, reducing the amount of data that needs to be scanned and processed. You can use partition columns in the WHERE clause in queries to scan only the specific partitions that your query needs. This can lead to faster query runtimes and more efficient resource utilization. It especially works well when columns with low cardinality are chosen as the key.

What if you have a high cardinality column that you sometimes need to filter by VIP customers? Each customer is usually identified with an ID, which can be millions. Partitioning isn’t suitable for such high cardinality columns because you end up with small files, slow partition filtering, and high Amazon Simple Storage Service (Amazon S3) API cost (one S3 prefix is created per value of partition column). Although you can use partitioning with a natural key such as city or state to narrow down your dataset to some degree, it is still necessary to query across date-based partitions if your data is time series.

This is where bucketing comes into play. Bucketing makes sure that all rows with the same values of one or more columns end up in the same file. Instead of one file per value, like partitioning, a hash function is used to distribute values evenly across a fixed number of files. By organizing data this way, you can perform efficient filtering, because only the relevant buckets need to be processed, further reducing computational overhead.

There are multiple options for implementing bucketing on AWS. One approach is to use the Amazon Athena CREATE TABLE AS SELECT (CTAS) statement, which allows you to create a bucketed table directly from a query. Alternatively, you can use AWS Glue for Apache Spark, which provides built-in support for bucketing configurations during the data transformation process. AWS Glue allows you to define bucketing parameters, such as the number of buckets and the columns to bucket on, providing an optimized data layout for efficient querying with Athena.

In this post, we discuss how to implement bucketing on AWS data lakes, including using Athena CTAS statement and AWS Glue for Apache Spark. We also cover bucketing for Apache Iceberg tables.

Example use case

In this post, you use a public dataset, the NOAA Integrated Surface Database. Data analysts run one-time queries for data during the past 5 years through Athena. Most of the queries are for specific stations with specific report types. The queries need to complete in 10 seconds, and the cost needs to be optimized carefully. In this scenario, you’re a data engineer responsible for optimizing query performance and cost.

For example, if an analyst wants to retrieve data for a specific station (for example, station ID 123456) with a particular report type (for example, CRN01), the query might look like the following query:

SELECT station, report_type, columnA, columnB, ...
FROM table_name
WHERE
report_type = 'CRN01'
AND station = '123456'

In the case of the NOAA Integrated Surface Database, the station_id column is likely to have a high cardinality, with numerous unique station identifiers. On the other hand, the report_type column may have a relatively low cardinality, with a limited set of report types. Given this scenario, it would be a good idea to partition the data by report_type and bucket it by station_id.

With this partitioning and bucketing strategy, Athena can first eliminate partitions for irrelevant report types, and then scan only the buckets within the relevant partition that match the specified station ID, significantly reducing the amount of data processed and accelerating query runtimes. This approach not only meets the query performance requirement, but also helps optimize costs by minimizing the amount of data scanned and billed for each query.

In this post, we examine how query performance is affected by data layout, in particular, bucketing. We also compare three different ways to achieve bucketing. The following table represents conditions for the tables to be created.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Parquet
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes

noaa_remote_original is partitioned by the year column, but not by the report_type column. This row represents if the table is partitioned by the actual columns that are used in the queries.

Baseline table

For this post, you create several tables with different conditions: some without bucketing and some with bucketing, to showcase the performance characteristics of bucketing. First, let’s create an original table using the NOAA data. In subsequent steps, you ingest data from this table to create test tables.

There are multiple ways to define a table definition: running DDL, an AWS Glue crawler, the AWS Glue Data Catalog API, and so on. In this step, you run DDL via the Athena console.

Complete the following steps to create the "bucketing_blog"."noaa_remote_original" table in the Data Catalog:

  1. Open the Athena console.
  2. In the query editor, run the following DDL to create a new AWS Glue database:
    -- Create Glue database
    CREATE DATABASE bucketing_blog;

  3. For Database under Data, choose bucketing_blog to set the current database.
  4. Run the following DDL to create the original table:
    -- Create original table
    CREATE EXTERNAL TABLE `bucketing_blog`.`noaa_remote_original`(
      `station` STRING, 
      `date` STRING, 
      `source` STRING, 
      `latitude` STRING, 
      `longitude` STRING, 
      `elevation` STRING, 
      `name` STRING, 
      `report_type` STRING, 
      `call_sign` STRING, 
      `quality_control` STRING, 
      `wnd` STRING, 
      `cig` STRING, 
      `vis` STRING, 
      `tmp` STRING, 
      `dew` STRING, 
      `slp` STRING, 
      `aj1` STRING, 
      `gf1` STRING, 
      `mw1` STRING)
    PARTITIONED BY (
        year STRING)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
    WITH SERDEPROPERTIES ( 
      'escapeChar'='\\',
      'quoteChar'='\"',
      'separatorChar'=',') 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://noaa-global-hourly-pds/'
    TBLPROPERTIES (
      'skip.header.line.count'='1'
    )

Because the source data has quoted fields, we use OpenCSVSerde instead of the default LazySimpleSerde.

These CSV files have a header row, which we tell Athena to skip by adding skip.header.line.count and setting the value to 1.

For more details, refer to OpenCSVSerDe for processing CSV.

  1. Run the following DDL to add partitions. We add partitions only for 5 years out of 124 years based on the use case requirement:
    -- Load partitions
    ALTER TABLE `bucketing_blog`.`noaa_remote_original` ADD
      PARTITION (year = '2024') LOCATION 's3://noaa-global-hourly-pds/2024/'
      PARTITION (year = '2023') LOCATION 's3://noaa-global-hourly-pds/2023/'
      PARTITION (year = '2022') LOCATION 's3://noaa-global-hourly-pds/2022/'
      PARTITION (year = '2021') LOCATION 's3://noaa-global-hourly-pds/2021/'
      PARTITION (year = '2020') LOCATION 's3://noaa-global-hourly-pds/2020/';

  2. Run the following DML to verify if you can successfully query the data:
    -- Check data 
    SELECT * FROM "bucketing_blog"."noaa_remote_original" LIMIT 10;

Now you’re ready to start querying the original table to examine the baseline performance.

  1. Run a query against the original table to evaluate the query performance as a baseline. The following query selects records for five specific stations with report type CRN05:
    -- Baseline
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."noaa_remote_original"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );

We ran this query 10 times. The average query runtime for 10 queries is 27.6 seconds, which is far longer than our target of 10 seconds, and 155.75 GB data is scanned to return 1.65 million records. This is the baseline performance of the original raw table. It’s time to start optimizing data layout from this baseline.

Next, you create tables with different conditions from the original: one without bucketing and one with bucketing, and compare them.

Optimize data layout using Athena CTAS

In this section, we use an Athena CTAS query to optimize data layout and its format.

First, let’s create a table with partitioning but without bucketing. The new table is partitioned by the column report_type because most of expected queries use this column in the WHERE clause, and objects are stored as Parquet with Snappy compression.

  1. Open the Athena query editor.
  2. Run the following query, providing your own S3 bucket and prefix:
    --CTAS, non-bucketed
    CREATE TABLE "bucketing_blog"."athena_non_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-non-bucketed/',
        partitioned_by = ARRAY['report_type'],
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshots.


There are 30 files under the partition.

Next, you create a table with Hive style bucketing. The number of buckets needs to be carefully tuned through experiments for your own use case. Generally speaking, the more buckets you have, the smaller the granularity, which might result in better performance. On the other hand, too many small files may introduce inefficiency in query planning and processing. Also, bucketing only works if you are querying a few values of the bucketing key. The more values you add to your query, the more likely that you will end up reading all buckets.

The following is the baseline query to optimize:

-- Baseline
SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
FROM "bucketing_blog"."noaa_remote_original"
WHERE
    report_type = 'CRN05'
    AND ( station = '99999904237'
        OR station = '99999953132'
        OR station = '99999903061'
        OR station = '99999963856'
        OR station = '99999994644'
    );

In this example, the table is going to be bucketed into 16 buckets by a high-cardinality column (station), which is supposed to be used for the WHERE clause in the query. All other conditions remain the same. The baseline query has five values in the station ID, and you expect queries to have around that number at most, which is less enough than the number of buckets, so 16 should work well. It is possible to specify a larger number of buckets, but CTAS can’t be used if the total number of partitions exceeds 100.

  1. Run the following query:
    -- CTAS, Hive-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-bucketed/',
        partitioned_by = ARRAY['report_type'],
        bucketed_by = ARRAY['station'],
        bucket_count = 16,
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

The query creates S3 objects organized as shown in the following screenshots.


The table-level layout looks exactly the same between athena_non_bucketed and athena_bucketed: there are 13 partitions in each table. The difference is the number of objects under the partitions. There are 16 objects (buckets) per partition, of roughly 10–25 MB each in this case. The number of buckets is constant at the specified value regardless of the amount of data, but the bucket size depends on the amount of data.

Now you’re ready to query against each table to evaluate query performance. The query will select records with five specific stations and report type CRN05 for the past 5 years. Although you can’t see which data of a specific station is located in which bucket, it has been calculated and located correctly by Athena.

  1. Query the non-bucketed table with the following statement:
    -- No bucketing 
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_non_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 10.95 seconds, and 358 MB of data is scanned to return 2.21 million records. Both the runtime and scan size have been significantly decreased because you’ve partitioned the data, and can now read only one partition where 12 partitions of 13 are skipped. In addition, the amount of data scanned has gone down from 206 GB to 360 MB, which is a reduction of 99.8%. This is not just due to the partitioning, but also due to the change of its format to Parquet and compression with Snappy.

  1. Query the bucketed table with the following statement:
    -- Hive bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 7.82 seconds, and 69 MB of data is scanned to return 2.21 million records. This means a reduction of average runtime from 10.95 to 7.82 seconds (-29%), and a dramatic reduction of data scanned from 358 MB to 69 MB (-81%) to return the same number of records compared with the non-bucketed table. In this case, both runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

Considerations

As stated earlier, size your bucket carefully to maximize performance of your query. Bucketing only works if you are querying a few values of the bucketing key. Consider creating more buckets than the number of values expected in the actual query.

Additionally, an Athena CTAS query is limited to create up to 100 partitions at one time. If you need a large number of partitions, you may want to use AWS Glue extract, transform, and load (ETL), although there is a workaround to split into multiple SQL statements.

Optimize data layout using AWS Glue ETL

Apache Spark is an open source distributed processing framework that enables flexible ETL with PySpark, Scala, and Spark SQL. It allows you to partition and bucket your data based on your requirements. Spark has several tuning options to accelerate jobs. You can effortlessly automate and monitor Spark jobs. In this section, we use AWS Glue ETL jobs to run Spark code to optimize data layout.

Unlike Athena bucketing, AWS Glue ETL uses Spark-based bucketing as a bucketing algorithm. All you need to do is add the following table property onto the table: bucketing_format = 'spark'. For details about this table property, see Partitioning and bucketing in Athena.

Complete the following steps to create a table with bucketing through AWS Glue ETL:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Create job and choose Visual ETL.
  3. Under Add nodes, choose AWS Glue Data Catalog for Sources.
  4. For Database, choose bucketing_blog.
  5. For Table, choose noaa_remote_original.
  6. Under Add nodes, choose Change Schema for Transforms.
  7. Under Add nodes, choose Custom Transform for Transforms.
  8. For Name, enter ToS3WithBucketing.
  9. For Node parents, choose Change Schema.
  10. For Code block, enter the following code snippet:
    def ToS3WithBucketing (glueContext, dfc) -> DynamicFrameCollection:
        # Convert DynamicFrame to DataFrame
        df = dfc.select(list(dfc.keys())[0]).toDF()
        
        # Write to S3 with bucketing and partitioning
        df.repartition(1, "report_type") \
            .write.option("path", "s3://<your-s3-location>/glue-bucketed/") \
            .mode("overwrite") \
            .partitionBy("report_type") \
            .bucketBy(16, "station") \
            .format("parquet") \
            .option("compression", "snappy") \
            .saveAsTable("bucketing_blog.glue_bucketed")

The following screenshot shows the job created using AWS Glue Studio to generate a table and data.

Each node represents the following:

  • The AWS Glue Data Catalog node loads the noaa_remote_original table from the Data Catalog
  • The Change Schema node makes sure that it loads columns registered in the Data Catalog
  • The ToS3WithBucketing node writes data to Amazon S3 with both partitioning and Spark-based bucketing

The job has been successfully authored in the visual editor.

  1. Under Job details, for IAM Role, choose your AWS Identity and Access Management (IAM) role for this job.
  2. For Worker type, choose G.8X.
  3. For Requested number of workers, enter 5.
  4. Choose Save, then choose Run.

After these steps, the table glue_bucketed. has been created.

  1. Choose Tables in the navigation pane, and choose the table glue_bucketed.
  2. On the Actions menu, choose Edit table under Manage.
  3. In the Table properties section, choose Add.
  4. Add a key pair with key bucketing_format and value spark.
  5. Choose Save.

Now it’s time to query the tables.

  1. Query the bucketed table with the following statement:
    -- Spark bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."glue_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran the query 10 times. The average runtime of the 10 queries is 7.09 seconds, and 88 MB of data is scanned to return 2.21 million records. In this case, both the runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

The reason for the larger bytes scanned compared to the Athena CTAS example is that the values were distributed differently in this table. In the AWS Glue bucketed table, the values were distributed over five files. In the Athena CTAS bucketed table, the values were distributed over four files. Remember that rows are distributed into buckets using a hash function. The Spark bucketing algorithm uses a different hash function than Hive, and in this case, it resulted in a different distribution across the files.

Considerations

Glue DynamicFrame does not support bucketing natively. You need to use Spark DataFrame instead of DynamicFrame to bucket tables.

For information about fine-tuning AWS Glue ETL performance, refer to Best practices for performance tuning AWS Glue for Apache Spark jobs.

Optimize Iceberg data layout with hidden partitioning

Apache Iceberg is a high-performance open table format for huge analytic tables, bringing the reliability and simplicity of SQL tables to big data. Recently, there has been a huge demand to use Apache Iceberg tables to achieve advanced capabilities like ACID transaction, time travel query, and more.

In Iceberg, bucketing works differently than the Hive table method we’ve seen so far. In Iceberg, bucketing is a subset of partitioning, and can be applied using the bucket partition transform. The way you use it and the end result is similar to bucketing in Hive tables. For more details about Iceberg bucket transforms, refer to Bucket Transform Details.

Complete the following steps:

  1. Open the Athena query editor.
  2. Run the following query to create an Iceberg table with hidden partitioning along with bucketing:
    -- CTAS, Iceberg-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed_iceberg"
    WITH (table_type = 'ICEBERG',
          location = 's3://<your-s3-location>/athena-bucketed-iceberg/', 
          is_external = false,
          partitioning = ARRAY['report_type', 'bucket(station, 16)'],
          format = 'PARQUET',
          write_compression = 'SNAPPY'
    ) 
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshot.

There are two folders: data and metadata. Drill down to data.

You see random prefixes under the data folder. Choose the first one to view its details.

You see the top-level partition based on the report_type column. Drill down to the next level.

You see the second-level partition, bucketed with the station column.

The Parquet data files exist under these folders.

  1. Query the bucketed table with the following statement:
    -- Iceberg bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed_iceberg"
    WHERE
        report_type = 'CRN05'
        AND
        ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


With the Iceberg-bucketed table, the average runtime of the 10 queries is 8.03 seconds, and 148 MB of data is scanned to return 2.21 million records. This is less efficient than bucketing with AWS Glue or Athena, but considering the benefits of Iceberg’s various features, it is within an acceptable range.

Results

The following table summarizes all the results.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Iceberg (Parquet)
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Table size (GB) 155.8 5.0 5.0 5.8 5.0
The number of S3 Objects 53360 376 192 192 195
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes
Bucketing format n/a n/a Hive Spark Iceberg
Number of buckets n/a n/a 16 16 16
Average runtime (sec) 29.178 10.950 7.815 7.089 8.030
Scanned size (MB) 206640.0 358.6 69.1 87.8 147.7

With athena_bucketed, glue_bucketed, and athena_bucketed_iceberg, you were able to meet the latency goal of 10 seconds. With bucketing, you saw a 25–40% reduction in runtime and a 60–85% reduction in scan size, which can contribute to both latency and cost optimization.

As you can see from the result, although partitioning contributes significantly to reduce both runtime and scan size, bucketing can also contribute to reduce them further.

Athena CTAS is straightforward and fast enough to complete the bucketing process. AWS Glue ETL is more flexible and scalable to achieve advanced use cases. You can choose either method based on your requirement and use case, because you can take advantage of bucketing through either option.

Conclusion

In this post, we demonstrated how to optimize your table data layout with partitioning and bucketing through Athena CTAS and AWS Glue ETL. We showed that bucketing contributes to accelerating query latency and reducing scan size to further optimize costs. We also discussed bucketing for Iceberg tables through hidden partitioning.

Bucketing just one technique to optimize data layout by reducing data scan. For optimizing your entire data layout, we recommend considering other options like partitioning, using columnar file format, and compression in conjunction with bucketing. This can enable your data to further enhance query performance.

Happy bucketing!


About the Authors

Takeshi Nakatani is a Principal Big Data Consultant on the Professional Services team in Tokyo. He has 26 years of experience in the IT industry, with expertise in architecting data infrastructure. On his days off, he can be a rock drummer or a motorcyclist.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

How Salesforce optimized their detection and response platform using AWS managed services

Post Syndicated from Atul Khare original https://aws.amazon.com/blogs/big-data/how-salesforce-optimized-their-detection-and-response-platform-using-aws-managed-services/

This is a guest blog post co-authored with Atul Khare and Bhupender Panwar from Salesforce.

Headquartered in San Francisco, Salesforce, Inc. is a cloud-based customer relationship management (CRM) software company building artificial intelligence (AI)-powered business applications that allow businesses to connect with their customers in new and personalized ways.

The Salesforce Trust Intelligence Platform (TIP) log platform team is responsible for data pipeline and data lake infrastructure, providing log ingestion, normalization, persistence, search, and detection capability to ensure Salesforce is safe from threat actors. It runs miscellaneous services to facilitate investigation, mitigation, and containment for security operations. The TIP team is critical to securing Salesforce’s infrastructure, detecting malicious threat activities, and providing timely responses to security events. This is achieved by collecting and inspecting petabytes of security logs across dozens of organizations, some with thousands of accounts.

In this post, we discuss how the Salesforce TIP team optimized their architecture using Amazon Web Services (AWS) managed services to achieve better scalability, cost, and operational efficiency.

TIP existing architecture bird’s eye view and scale of the platform

The main key performance indicator (KPI) for the TIP platform is its capability to ingest a high volume of security logs from a variety of Salesforce internal systems in real time and process them with high velocity. The platform ingests more than 1 PB of data per day, more than 10 million events per second, and more than 200 different log types. The platform ingests log files in JSON, text, and Common Event Format (CEF) formats.

The message bus in TIP’s existing architecture mainly uses Apache Kafka for ingesting different log types coming from the upstream systems. Kafka had a single topic for all the log types before they were consumed by different downstream applications including Splunk, Streaming Search, and Log Normalizer. The Normalized Parquet Logs are stored in an Amazon Simple Storage Service (Amazon S3) data lake and cataloged into Hive Metastore (HMS) on an Amazon Relational Database Service (Amazon RDS) instance based on S3 event notifications. The data lake consumers then use Apache Presto running on Amazon EMR cluster to perform one-time queries. Other teams including the Data Science and Machine Learning teams use the platform to detect, analyze, and control security threats.

Challenges with the existing TIP log platform architecture

Some of the main challenges that TIP’s existing architecture was facing include:

  • Heavy operational overhead and maintenance cost managing the Kafka cluster
  • High cost to serve (CTS) to meet growing business needs
  • Compute threads limited by partitions’ numbers
  • Difficult to scale out when traffic increases
  • Weekly patching creates lags
  • Challenges with HMS scalability

All these challenges motivated the TIP team to embark on a journey to create a more optimized platform that’s easier to scale with less operational overhead and lower CTS.

New TIP log platform architecture

The Salesforce TIP log platform engineering team, in collaboration with AWS, started building the new architecture to replace the Kafka-based message bus solution with the fully managed AWS messaging and notification solutions Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Notification Service (Amazon SNS). In the new design, the upstream systems send their logs to a central Amazon S3 storage location, which invokes a process to partition the logs and store them in an S3 data lake. Consumer applications such as Splunk get the messages delivered to their system using Amazon SQS. Similarly, the partitioned log data through Amazon SQS events initializes a log normalization process that delivers the normalized log data to open source Delta Lake tables on an S3 data lake. One of the major changes in the new architecture is the use of an AWS Glue Data Catalog to replace the previous Hive Metastore. The one-time analysis applications use Apache Trino on an Amazon EMR cluster to query the Delta Tables cataloged in AWS Glue. Other consumer applications also read the data from S3 data lake files stored in Delta Table format. More details on some of the important processes are as follows:

Log partitioner (Spark structured stream)

This service ingests logs from the Amazon S3 SNS SQS-based store and stores them in the partitioned (by log types) format in S3 for further downstream consumptions from the Amazon SNS SQS subscription. This is the bronze layer of the TIP data lake.

Log normalizer (Spark structured stream)

One of the downstream consumers of log partitioner (Splunk Ingestor is another one), the log normalizer ingests the data from Partitioned Output S3, using Amazon SNS SQS notifications, and enriches them using Salesforce custom parsers and tags. Finally, this enriched data is landed in the data lake on S3. This is the silver layer of the TIP data lake.

Machine learning and other data analytics consumers (Trino, Flink, and Spark Jobs)

These consumers consume from the silver layer of the TIP data lake and run analytics for security detection use cases. The earlier Kafka interface is now converted to delta streams ingestion, which concludes the total removal of the Kafka bus from the TIP data pipeline.

Advantages of the new TIP log platform architecture

The main advantages realized by the Salesforce TIP team based on this new architecture using Amazon S3, Amazon SNS, and Amazon SQS include:

  • Cost savings of approximately $400 thousand per month
  • Auto scaling to meet growing business needs
  • Zero DevOps maintenance overhead
  • No mapping of partitions to compute threads
  • Compute resources can be scaled up and down independently
  • Fully managed Data Catalog to reduce the operational overhead of managing HMS

Summary

In this blog post we discussed how the Salesforce Trust Intelligence Platform (TIP) optimized their data pipeline by replacing the Kafka-based message bus solution with fully managed AWS messaging and notification solutions using Amazon SQS and Amazon SNS. Salesforce and AWS teams worked together to make sure this new platform seamlessly scales to ingest more than 1 PB of data per day, more than 10 millions events per second, and more than 200 different log types. Reach out to your AWS account team if you have similar use cases and you need help architecting your platform to achieve operational efficiencies and scale.


About the authors

Atul Khare is a Director of Engineering at Salesforce Security, where he spearheads the Security Log Platform and Data Lakehouse initiatives. He supports diverse security customers by building robust big data ETL pipeline that is elastic, resilient, and easy to use, providing uniform & consistent security datasets for threat detection and response operations, AI, forensic analysis, analytics, and compliance needs across all Salesforce clouds. Beyond his professional endeavors, Atul enjoys performing music with his band to raise funds for local charities.

Bhupender Panwar is a Big Data Architect at Salesforce and seasoned advocate for big data and cloud computing. His background encompasses the development of data-intensive applications and pipelines, solving intricate architectural and scalability challenges, and extracting valuable insights from extensive datasets within the technology industry. Outside of his big data work, Bhupender loves to hike, bike, enjoy travel and is a great foodie.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Vikas Panghal is the Principal Product Manager leading the product management team for Amazon SNS and Amazon SQS. He has deep expertise in event-driven and messaging applications and brings a wealth of knowledge and experience to his role, shaping the future of messaging services. He is passionate about helping customers build highly scalable, fault-tolerant, and loosely coupled systems. Outside of work, he enjoys spending time with his family outdoors, playing chess, and running.

Amazon DataZone announces integration with AWS Lake Formation hybrid access mode for the AWS Glue Data Catalog

Post Syndicated from Utkarsh Mittal original https://aws.amazon.com/blogs/big-data/amazon-datazone-announces-integration-with-aws-lake-formation-hybrid-access-mode-for-the-aws-glue-data-catalog/

Last week, we announced the general availability of the integration between Amazon DataZone and AWS Lake Formation hybrid access mode. In this post, we share how this new feature helps you simplify the way you use Amazon DataZone to enable secure and governed sharing of your data in the AWS Glue Data Catalog. We also delve into how data producers can share their AWS Glue tables through Amazon DataZone without needing to register them in Lake Formation first.

Overview of the Amazon DataZone integration with Lake Formation hybrid access mode

Amazon DataZone is a fully managed data management service to catalog, discover, analyze, share, and govern data between data producers and consumers in your organization. With Amazon DataZone, data producers populate the business data catalog with data assets from data sources such as the AWS Glue Data Catalog and Amazon Redshift. They also enrich their assets with business context to make it straightforward for data consumers to understand. After the data is available in the catalog, data consumers such as analysts and data scientists can search and access this data by requesting subscriptions. When the request is approved, Amazon DataZone can automatically provision access to the data by managing permissions in Lake Formation or Amazon Redshift so that the data consumer can start querying the data using tools such as Amazon Athena or Amazon Redshift.

To manage the access to data in the AWS Glue Data Catalog, Amazon DataZone uses Lake Formation. Previously, if you wanted to use Amazon DataZone for managing access to your data in the AWS Glue Data Catalog, you had to onboard your data to Lake Formation first. Now, the integration of Amazon DataZone and Lake Formation hybrid access mode simplifies how you can get started with your Amazon DataZone journey by removing the need to onboard your data to Lake Formation first.

Lake Formation hybrid access mode allows you to start managing permissions on your AWS Glue databases and tables through Lake Formation, while continuing to maintain any existing AWS Identity and Access Management (IAM) permissions on these tables and databases. Lake Formation hybrid access mode supports two permission pathways to the same Data Catalog databases and tables:

  • In the first pathway, Lake Formation allows you to select specific principals (opt-in principals) and grant them Lake Formation permissions to access databases and tables by opting in
  • The second pathway allows all other principals (that are not added as opt-in principals) to access these resources through the IAM principal policies for Amazon Simple Storage Service (Amazon S3) and AWS Glue actions

With the integration between Amazon DataZone and Lake Formation hybrid access mode, if you have tables in the AWS Glue Data Catalog that are managed through IAM-based policies, you can publish these tables directly to Amazon DataZone, without registering them in Lake Formation. Amazon DataZone registers the location of these tables in Lake Formation using hybrid access mode, which allows managing permissions on AWS Glue tables through Lake Formation, while continuing to maintain any existing IAM permissions.

Amazon DataZone enables you to publish any type of asset in the business data catalog. For some of these assets, Amazon DataZone can automatically manage access grants. These assets are called managed assets, and include Lake Formation-managed Data Catalog tables and Amazon Redshift tables and views. Prior to this integration, you had to complete the following steps before Amazon DataZone could treat the published Data Catalog table as a managed asset:

  1. Identity the Amazon S3 location associated with Data Catalog table.
  2. Register the Amazon S3 location with Lake Formation in hybrid access mode using a role with appropriate permissions.
  3. Publish the table metadata to the Amazon DataZone business data catalog.

The following diagram illustrates this workflow.

With the Amazon DataZone’s integration with Lake Formation hybrid access mode, you can simply publish your AWS Glue tables to Amazon DataZone without having to worry about registering the Amazon S3 location or adding an opt-in principal in Lake Formation by delegating these steps to Amazon DataZone. The administrator of an AWS account can enable the data location registration setting under the DefaultDataLake blueprint on the Amazon DataZone console. Now, a data owner or publisher can publish their AWS Glue table (managed through IAM permissions) to Amazon DataZone without the extra setup steps. When a data consumer subscribes to this table, Amazon DataZone registers the Amazon S3 locations of the table in hybrid access mode, adds the data consumer’s IAM role as an opt-in principal, and grants access to the same IAM role by managing permissions on the table through Lake Formation. This makes sure that IAM permissions on the table can coexist with newly granted Lake Formation permissions, without disrupting any existing workflows. The following diagram illustrates this workflow.

Solution overview

To demonstrate this new capability, we use a sample customer scenario where the finance team wants to access data owned by the sales team for financial analysis and reporting. The sales team has a pipeline that creates a dataset containing valuable information about ticket sales, popular events, venues, and seasons. We call it the tickit dataset. The sales team stores this dataset in Amazon S3 and registers it in a database in the Data Catalog. The access to this table is currently managed through IAM-based permissions. However, the sales team wants to publish this table to Amazon DataZone to facilitate secure and governed data sharing with the finance team.

The steps to configure this solution are as follows:

  1. The Amazon DataZone administrator enables the data lake location registration setting in Amazon DataZone to automatically register the Amazon S3 location of the AWS Glue tables in Lake Formation hybrid access mode.
  2. After the hybrid access mode integration is enabled in Amazon DataZone, the finance team requests a subscription to the sales data asset. The asset shows up as a managed asset, which means Amazon DataZone can manage access to this asset even if the Amazon S3 location of this asset isn’t registered in Lake Formation.
  3. The sales team is notified of a subscription request raised by the finance team. They review and approve the access request. After the request is approved, Amazon DataZone fulfills the subscription request by managing permissions in the Lake Formation. It registers the Amazon S3 location of the subscribed table in Lake Formation hybrid mode.
  4. The finance team gains access to the sales dataset required for their financial reports. They can go to their DataZone environment and start running queries using Athena against their subscribed dataset.

Prerequisites

To follow the steps in this post, you need an AWS account. If you don’t have an account, you can create one. In addition, you must have the following resources configured in your account:

  • An S3 bucket
  • An AWS Glue database and crawler
  • IAM roles for different personas and services
  • An Amazon DataZone domain and project
  • An Amazon DataZone environment profile and environment
  • An Amazon DataZone data source

If you don’t have these resources already configured, you can create them by deploying the following AWS CloudFormation stack:

  1. Choose Launch Stack to deploy a CloudFormation template.
  2. Complete the steps to deploy the template and leave all settings as default.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources, then choose Submit.

After the CloudFormation deployment is complete, you can log in to the Amazon DataZone portal and manually trigger a data source run. This pulls any new or modified metadata from the source and updates the associated assets in the inventory. This data source has been configured to automatically publish the data assets to the catalog.

  1. On the Amazon DataZone console, choose View domains.

You should be logged in using the same role that is used to deploy CloudFormation and verify that you are in the same AWS Region.

  1. Find the domain blog_dz_domain, then choose Open data portal.
  2. Choose Browse all projects and choose Sales producer project.
  3. On the Data tab, choose Data sources in the navigation pane.
  4. Locate and choose the data source that you want to run.

This opens the data source details page.

  1. Choose the options menu (three vertical dots) next to tickit_datasource and choose Run.

The data source status changes to Running as Amazon DataZone updates the asset metadata.

Enable hybrid mode integration in Amazon DataZone

In this step, the Amazon DataZone administrator goes through the process of enabling the Amazon DataZone integration with Lake Formation hybrid access mode. Complete the following steps:

  1. On a separate browser tab, open the Amazon DataZone console.

Verify that you are in the same Region where you deployed the CloudFormation template.

  1. Choose View domains.
  2. Choose the domain created by AWS CloudFormation, blog_dz_domain.
  3. Scroll down on the domain details page and choose the Blueprints tab.

A blueprint defines what AWS tools and services can be used with the data assets published in Amazon DataZone. The DefaultDataLake blueprint is enabled as part of the CloudFormation stack deployment. This blueprint enables you to create and query AWS Glue tables using Athena. For the steps to enable this in your own deployments, refer to Enable built-in blueprints in the AWS account that owns the Amazon DataZone domain.

  1. Choose the DefaultDataLake blueprint.
  2. On the Provisioning tab, choose Edit.
  3. Select Enable Amazon DataZone to register S3 locations using AWS Lake Formation hybrid access mode.

You have the option of excluding specific Amazon S3 locations if you don’t want Amazon DataZone to automatically register them to Lake Formation hybrid access mode.

  1. Choose Save changes.

Request access

In this step, you log in to Amazon DataZone as the finance team, search for the sales data asset, and subscribe to it. Complete the following steps:

  1. Return to your Amazon DataZone data portal browser tab.
  2. Switch to the finance consumer project by choosing the dropdown menu next to the project name and choosing Finance consumer project.

From this step onwards, you take on the persona of a finance user looking to subscribe to a data asset published in the previous step.

  1. In the search bar, search for and choose the sales data asset.
  2. Choose Subscribe.

The asset shows up as managed asset. This means that Amazon DataZone can grant access to this data asset to the finance team’s project by managing the permissions in Lake Formation.

  1. Enter a reason for the access request and choose Subscribe.

Approve access request

The sales team gets a notification that an access request from the finance team is submitted. To approve the request, complete the following steps:

  1. Choose the dropdown menu next to the project name and choose Sales producer project.

You now assume the persona of the sales team, who are the owners and stewards of the sales data assets.

  1. Choose the notification icon at the top-right corner of the DataZone portal.
  2. Choose the Subscription Request Created task.
  3. Grant access to the sales data asset to the finance team and choose Approve.

Analyze the data

The finance team has now been granted access to the sales data, and this dataset has been to their Amazon DataZone environment. They can access the environment and query the sales dataset with Athena, along with any other datasets they currently own. Complete the following steps:

  1. On the dropdown menu, choose Finance consumer project.

On the right pane of the project overview screen, you can find a list of active environments available for use.

  1. Choose the Amazon DataZone environment finance_dz_environment.
  2. In the navigation pane, under Data assets, choose Subscribed.
  3. Verify that your environment now has access to the sales data.

It may take a few minutes for the data asset to be automatically added to your environment.

  1. Choose the new tab icon for Query data.

A new tab opens with the Athena query editor.

  1. For Database, choose finance_consumer_db_tickitdb-<suffix>.

This database will contain your subscribed data assets.

  1. Generate a preview of the sales table by choosing the options menu (three vertical dots) and choosing Preview table.

Clean up

To clean up your resources, complete the following steps:

  1. Switch back to the administrator role you used to deploy the CloudFormation stack.
  2. On the Amazon DataZone console, delete the projects used in this post. This will delete most project-related objects like data assets and environments.
  3. On the AWS CloudFormation console, delete the stack you deployed in the beginning of this post.
  4. On the Amazon S3 console, delete the S3 buckets containing the tickit dataset.
  5. On the Lake Formation console, delete the Lake Formation admins registered by Amazon DataZone.
  6. On the Lake Formation console, delete tables and databases created by Amazon DataZone.

Conclusion

In this post, we discussed how the integration between Amazon DataZone and Lake Formation hybrid access mode simplifies the process to start using Amazon DataZone for end-to-end governance of your data in the AWS Glue Data Catalog. This integration helps you bypass the manual steps of onboarding to Lake Formation before you can start using Amazon DataZone.

For more information on how to get started with Amazon DataZone, refer to the Getting started guide. Check out the YouTube playlist for some of the latest demos of Amazon DataZone and short descriptions of the capabilities available. For more information about Amazon DataZone, see How Amazon DataZone helps customers find value in oceans of data.


About the Authors

Utkarsh Mittal is a Senior Technical Product Manager for Amazon DataZone at AWS. He is passionate about building innovative products that simplify customers’ end-to-end analytics journeys. Outside of the tech world, Utkarsh loves to play music, with drums being his latest endeavor.

Praveen Kumar is a Principal Analytics Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-centered services. His areas of interests are serverless technology, modern cloud data warehouses, streaming, and generative AI applications.

Paul Villena is a Senior Analytics Solutions Architect in AWS with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interests are infrastructure as code, serverless technologies, and coding in Python

Amazon DataZone now integrates with AWS Glue Data Quality and external data quality solutions

Post Syndicated from Andrea Filippo La Scola original https://aws.amazon.com/blogs/big-data/amazon-datazone-now-integrates-with-aws-glue-data-quality-and-external-data-quality-solutions/

Today, we are pleased to announce that Amazon DataZone is now able to present data quality information for data assets. This information empowers end-users to make informed decisions as to whether or not to use specific assets.

Many organizations already use AWS Glue Data Quality to define and enforce data quality rules on their data, validate data against predefined rules, track data quality metrics, and monitor data quality over time using artificial intelligence (AI). Other organizations monitor the quality of their data through third-party solutions.

Amazon DataZone now integrates directly with AWS Glue to display data quality scores for AWS Glue Data Catalog assets. Additionally, Amazon DataZone now offers APIs for importing data quality scores from external systems.

In this post, we discuss the latest features of Amazon DataZone for data quality, the integration between Amazon DataZone and AWS Glue Data Quality and how you can import data quality scores produced by external systems into Amazon DataZone via API.

Challenges

One of the most common questions we get from customers is related to displaying data quality scores in the Amazon DataZone business data catalog to let business users have visibility into the health and reliability of the datasets.

As data becomes increasingly crucial for driving business decisions, Amazon DataZone users are keenly interested in providing the highest standards of data quality. They recognize the importance of accurate, complete, and timely data in enabling informed decision-making and fostering trust in their analytics and reporting processes.

Amazon DataZone data assets can be updated at varying frequencies. As data is refreshed and updated, changes can happen through upstream processes that put it at risk of not maintaining the intended quality. Data quality scores help you understand if data has maintained the expected level of quality for data consumers to use (through analysis or downstream processes).

From a producer’s perspective, data stewards can now set up Amazon DataZone to automatically import the data quality scores from AWS Glue Data Quality (scheduled or on demand) and include this information in the Amazon DataZone catalog to share with business users. Additionally, you can now use new Amazon DataZone APIs to import data quality scores produced by external systems into the data assets.

With the latest enhancement, Amazon DataZone users can now accomplish the following:

  • Access insights about data quality standards directly from the Amazon DataZone web portal
  • View data quality scores on various KPIs, including data completeness, uniqueness, accuracy
  • Make sure users have a holistic view of the quality and trustworthiness of their data.

In the first part of this post, we walk through the integration between AWS Glue Data Quality and Amazon DataZone. We discuss how to visualize data quality scores in Amazon DataZone, enable AWS Glue Data Quality when creating a new Amazon DataZone data source, and enable data quality for an existing data asset.

In the second part of this post, we discuss how you can import data quality scores produced by external systems into Amazon DataZone via API. In this example, we use Amazon EMR Serverless in combination with the open source library Pydeequ to act as an external system for data quality.

Visualize AWS Glue Data Quality scores in Amazon DataZone

You can now visualize AWS Glue Data Quality scores in data assets that have been published in the Amazon DataZone business catalog and that are searchable through the Amazon DataZone web portal.

If the asset has AWS Glue Data Quality enabled, you can now quickly visualize the data quality score directly in the catalog search pane.

By selecting the corresponding asset, you can understand its content through the readme, glossary terms, and technical and business metadata. Additionally, the overall quality score indicator is displayed in the Asset Details section.

A data quality score serves as an overall indicator of a dataset’s quality, calculated based on the rules you define.

On the Data quality tab, you can access the details of data quality overview indicators and the results of the data quality runs.

The indicators shown on the Overview tab are calculated based on the results of the rulesets from the data quality runs.

Each rule is assigned an attribute that contributes to the calculation of the indicator. For example, rules that have the Completeness attribute will contribute to the calculation of the corresponding indicator on the Overview tab.

To filter data quality results, choose the Applicable column dropdown menu and choose your desired filter parameter.

You can also visualize column-level data quality starting on the Schema tab.

When data quality is enabled for the asset, the data quality results become available, providing insightful quality scores that reflect the integrity and reliability of each column within the dataset.

When you choose one of the data quality result links, you’re redirected to the data quality detail page, filtered by the selected column.

Data quality historical results in Amazon DataZone

Data quality can change over time for many reasons:

  • Data formats may change because of changes in the source systems
  • As data accumulates over time, it may become outdated or inconsistent
  • Data quality can be affected by human errors in data entry, data processing, or data manipulation

In Amazon DataZone, you can now track data quality over time to confirm reliability and accuracy. By analyzing the historical report snapshot, you can identify areas for improvement, implement changes, and measure the effectiveness of those changes.

Enable AWS Glue Data Quality when creating a new Amazon DataZone data source

In this section, we walk through the steps to enable AWS Glue Data Quality when creating a new Amazon DataZone data source.

Prerequisites

To follow along, you should have a domain for Amazon DataZone, an Amazon DataZone project, and a new Amazon DataZone environment (with a DataLakeProfile). For instructions, refer to Amazon DataZone quickstart with AWS Glue data.

You also need to define and run a ruleset against your data, which is a set of data quality rules in AWS Glue Data Quality. To set up the data quality rules and for more information on the topic, refer to the following posts:

After you create the data quality rules, make sure that Amazon DataZone has the permissions to access the AWS Glue database managed through AWS Lake Formation. For instructions, see Configure Lake Formation permissions for Amazon DataZone.

In our example, we have configured a ruleset against a table containing patient data within a healthcare synthetic dataset generated using Synthea. Synthea is a synthetic patient generator that creates realistic patient data and associated medical records that can be used for testing healthcare software applications.

The ruleset contains 27 individual rules (one of them failing), so the overall data quality score is 96%.

If you use Amazon DataZone managed policies, there is no action needed because these will get automatically updated with the needed actions. Otherwise, you need to allow Amazon DataZone to have the required permissions to list and get AWS Glue Data Quality results, as shown in the Amazon DataZone user guide.

Create a data source with data quality enabled

In this section, we create a data source and enable data quality. You can also update an existing data source to enable data quality. We use this data source to import metadata information related to our datasets. Amazon DataZone will also import data quality information related to the (one or more) assets contained in the data source.

  1. On the Amazon DataZone console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Name, enter a name for your data source.
  4. For Data source type, select AWS Glue.
  5. For Environment, choose your environment.
  6. For Database name, enter a name for the database.
  7. For Table selection criteria, choose your criteria.
  8. Choose Next.
  9. For Data quality, select Enable data quality for this data source.

If data quality is enabled, Amazon DataZone will automatically fetch data quality scores from AWS Glue at each data source run.

  1. Choose Next.

Now you can run the data source.

While running the data source, Amazon DataZone imports the last 100 AWS Glue Data Quality run results. This information is now visible on the asset page and will be visible to all Amazon DataZone users after publishing the asset.

Enable data quality for an existing data asset

In this section, we enable data quality for an existing asset. This might be useful for users that already have data sources in place and want to enable the feature afterwards.

Prerequisites

To follow along, you should have already run the data source and produced an AWS Glue table data asset. Additionally, you should have defined a ruleset in AWS Glue Data Quality over the target table in the Data Catalog.

For this example, we ran the data quality job multiple times against the table, producing the related AWS Glue Data Quality scores, as shown in the following screenshot.

Import data quality scores into the data asset

Complete the following steps to import the existing AWS Glue Data Quality scores into the data asset in Amazon DataZone:

  1. Within the Amazon DataZone project, navigate to the Inventory data pane and choose the data source.

If you choose the Data quality tab, you can see that there’s still no information on data quality because AWS Glue Data Quality integration is not enabled for this data asset yet.

  1. On the Data quality tab, choose Enable data quality.
  2. In the Data quality section, select Enable data quality for this data source.
  3. Choose Save.

Now, back on the Inventory data pane, you can see a new tab: Data quality.

On the Data quality tab, you can see data quality scores imported from AWS Glue Data Quality.

Ingest data quality scores from an external source using Amazon DataZone APIs

Many organizations already use systems that calculate data quality by performing tests and assertions on their datasets. Amazon DataZone now supports importing third-party originated data quality scores via API, allowing users that navigate the web portal to view this information.

In this section, we simulate a third-party system pushing data quality scores into Amazon DataZone via APIs through Boto3 (Python SDK for AWS).

For this example, we use the same synthetic dataset as earlier, generated with Synthea.

The following diagram illustrates the solution architecture.

The workflow consists of the following steps:

  1. Read a dataset of patients in Amazon Simple Storage Service (Amazon S3) directly from Amazon EMR using Spark.

The dataset is created as a generic S3 asset collection in Amazon DataZone.

  1. In Amazon EMR, perform data validation rules against the dataset.
  2. The metrics are saved in Amazon S3 to have a persistent output.
  3. Use Amazon DataZone APIs through Boto3 to push custom data quality metadata.
  4. End-users can see the data quality scores by navigating to the data portal.

Prerequisites

We use Amazon EMR Serverless and Pydeequ to run a fully managed Spark environment. To learn more about Pydeequ as a data testing framework, see Testing Data quality at scale with Pydeequ.

To allow Amazon EMR to send data to the Amazon DataZone domain, make sure that the IAM role used by Amazon EMR has the permissions to do the following:

  • Read from and write to the S3 buckets
  • Call the post_time_series_data_points action for Amazon DataZone:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "datazone:PostTimeSeriesDataPoints"
                ],
                "Resource": [
                    "<datazone_domain_arn>"
                ]
            }
        ]
    }

Make sure that you added the EMR role as a project member in the Amazon DataZone project. On the Amazon DataZone console, navigate to the Project members page and choose Add members.

Add the EMR role as a contributor.

Ingest and analyze PySpark code

In this section, we analyze the PySpark code that we use to perform data quality checks and send the results to Amazon DataZone. You can download the complete PySpark script.

To run the script entirely, you can submit a job to EMR Serverless. The service will take care of scheduling the job and automatically allocating the resources needed, enabling you to track the job run statuses throughout the process.

You can submit a job to EMR within the Amazon EMR console using EMR Studio or programmatically, using the AWS CLI or using one of the AWS SDKs.

In Apache Spark, a SparkSession is the entry point for interacting with DataFrames and Spark’s built-in functions. The script will start initializing a SparkSession:

with SparkSession.builder.appName("PatientsDataValidation") \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .getOrCreate() as spark:

We read a dataset from Amazon S3. For increased modularity, you can use the script input to refer to the S3 path:

s3inputFilepath = sys.argv[1]
s3outputLocation = sys.argv[2]

df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(s3inputFilepath) #s3://<bucket_name>/patients/patients.csv

Next, we set up a metrics repository. This can be helpful to persist the run results in Amazon S3.

metricsRepository = FileSystemMetricsRepository(spark, s3_write_path)

Pydeequ allows you to create data quality rules using the builder pattern, which is a well-known software engineering design pattern, concatenating instruction to instantiate a VerificationSuite object:

key_tags = {'tag': 'patient_df'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(metricsRepository) \
    .addCheck(
        check.hasSize(lambda x: x >= 1000) \
        .isComplete("birthdate")  \
        .isUnique("id")  \
        .isComplete("ssn") \
        .isComplete("first") \
        .isComplete("last") \
        .hasMin("healthcare_coverage", lambda x: x == 1000.0)) \
    .saveOrAppendResult(resultKey) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following is the output for the data validation rules:

+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|check           |check_level|check_status|constraint                                          |constraint_status|constraint_message                                  |
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|Integrity checks|Error      |Error       |SizeConstraint(Size(None))                          |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(birthdate,None))|Success          |                                                    |
|Integrity checks|Error      |Error       |UniquenessConstraint(Uniqueness(List(id),None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(ssn,None))      |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(first,None))    |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(last,None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |MinimumConstraint(Minimum(healthcare_coverage,None))|Failure          |Value: 0.0 does not meet the constraint requirement!|
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+

At this point, we want to insert these data quality values in Amazon DataZone. To do so, we use the post_time_series_data_points function in the Boto3 Amazon DataZone client.

The PostTimeSeriesDataPoints DataZone API allows you to insert new time series data points for a given asset or listing, without creating a new revision.

At this point, you might also want to have more information on which fields are sent as input for the API. You can use the APIs to obtain the specification for Amazon DataZone form types; in our case, it’s amazon.datazone.DataQualityResultFormType.

You can also use the AWS CLI to invoke the API and display the form structure:

aws datazone get-form-type --domain-identifier <your_domain_id> --form-type-identifier amazon.datazone.DataQualityResultFormType --region <domain_region> --output text --query 'model.smithy'

This output helps identify the required API parameters, including fields and value limits:

$version: "2.0"
namespace amazon.datazone
structure DataQualityResultFormType {
    @amazon.datazone#timeSeriesSummary
    @range(min: 0, max: 100)
    passingPercentage: Double
    @amazon.datazone#timeSeriesSummary
    evaluationsCount: Integer
    evaluations: EvaluationResults
}
@length(min: 0, max: 2000)
list EvaluationResults {
    member: EvaluationResult
}

@length(min: 0, max: 20)
list ApplicableFields {
    member: String
}

@length(min: 0, max: 20)
list EvaluationTypes {
    member: String
}

enum EvaluationStatus {
    PASS,
    FAIL
}

string EvaluationDetailType

map EvaluationDetails {
    key: EvaluationDetailType
    value: String
}

structure EvaluationResult {
    description: String
    types: EvaluationTypes
    applicableFields: ApplicableFields
    status: EvaluationStatus
    details: EvaluationDetails
}

To send the appropriate form data, we need to convert the Pydeequ output to match the DataQualityResultsFormType contract. This can be achieved with a Python function that processes the results.

For each DataFrame row, we extract information from the constraint column. For example, take the following code:

CompletenessConstraint(Completeness(birthdate,None))

We convert it to the following:

{
  "constraint": "CompletenessConstraint",
  "statisticName": "Completeness_custom",
  "column": "birthdate"
}

Make sure to send an output that matches the KPIs that you want to track. In our case, we are appending _custom to the statistic name, resulting in the following format for KPIs:

  • Completeness_custom
  • Uniqueness_custom

In a real-world scenario, you might want to set a value that matches with your data quality framework in relation to the KPIs that you want to track in Amazon DataZone.

After applying a transformation function, we have a Python object for each rule evaluation:

..., {
   'applicableFields': ["healthcare_coverage"],
   'types': ["Minimum_custom"],
   'status': 'FAIL',
   'description': 'MinimumConstraint - Minimum - Value: 0.0 does not meet the constraint requirement!'
 },...

We also use the constraint_status column to compute the overall score:

(number of success / total number of evaluation) * 100

In our example, this results in a passing percentage of 85.71%.

We set this value in the passingPercentage input field along with the other information related to the evaluations in the input of the Boto3 method post_time_series_data_points:

import boto3

# Instantiate the client library to communicate with Amazon DataZone Service
#
datazone = boto3.client(
    service_name='datazone', 
    region_name=<Region(String) example: us-east-1>
)

# Perform the API operation to push the Data Quality information to Amazon DataZone
#
datazone.post_time_series_data_points(
    domainIdentifier=<DataZone domain ID>,
    entityIdentifier=<DataZone asset ID>,
    entityType='ASSET',
    forms=[
        {
            "content": json.dumps({
                    "evaluationsCount":<Number of evaluations (number)>,
                    "evaluations": [<List of objects {
                        'description': <Description (String)>,
                        'applicableFields': [<List of columns involved (String)>],
                        'types': [<List of KPIs (String)>],
                        'status': <FAIL/PASS (string)>
                        }>
                     ],
                    "passingPercentage":<Score (number)>
                }),
            "formName": <Form name(String) example: PydeequRuleSet1>,
            "typeIdentifier": "amazon.datazone.DataQualityResultFormType",
            "timestamp": <Date (timestamp)>
        }
    ]
)

Boto3 invokes the Amazon DataZone APIs. In these examples, we used Boto3 and Python, but you can choose one of the AWS SDKs developed in the language you prefer.

After setting the appropriate domain and asset ID and running the method, we can check on the Amazon DataZone console that the asset data quality is now visible on the asset page.

We can observe that the overall score matches with the API input value. We can also see that we were able to add customized KPIs on the overview tab through custom types parameter values.

With the new Amazon DataZone APIs, you can load data quality rules from third-party systems into a specific data asset. With this capability, Amazon DataZone allows you to extend the types of indicators present in AWS Glue Data Quality (such as completeness, minimum, and uniqueness) with custom indicators.

Clean up

We recommend deleting any potentially unused resources to avoid incurring unexpected costs. For example, you can delete the Amazon DataZone domain and the EMR application you created during this process.

Conclusion

In this post, we highlighted the latest features of Amazon DataZone for data quality, empowering end-users with enhanced context and visibility into their data assets. Furthermore, we delved into the seamless integration between Amazon DataZone and AWS Glue Data Quality. You can also use the Amazon DataZone APIs to integrate with external data quality providers, enabling you to maintain a comprehensive and robust data strategy within your AWS environment.

To learn more about Amazon DataZone, refer to the Amazon DataZone User Guide.


About the Authors


Andrea Filippo
is a Partner Solutions Architect at AWS supporting Public Sector partners and customers in Italy. He focuses on modern data architectures and helping customers accelerate their cloud journey with serverless technologies.

Emanuele is a Solutions Architect at AWS, based in Italy, after living and working for more than 5 years in Spain. He enjoys helping large companies with the adoption of cloud technologies, and his area of expertise is mainly focused on Data Analytics and Data Management. Outside of work, he enjoys traveling and collecting action figures.

Varsha Velagapudi is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about simplifying customers’ AI/ML and analytics journey to help them succeed in their day-to-day tasks. Outside of work, she enjoys nature and outdoor activities, reading, and traveling.

Use Apache Iceberg in your data lake with Amazon S3, AWS Glue, and Snowflake

Post Syndicated from Andries Engelbrecht original https://aws.amazon.com/blogs/big-data/use-apache-iceberg-in-your-data-lake-with-amazon-s3-aws-glue-and-snowflake/

This is post is co-written with Andries Engelbrecht and Scott Teal from Snowflake.

Businesses are constantly evolving, and data leaders are challenged every day to meet new requirements. For many enterprises and large organizations, it is not feasible to have one processing engine or tool to deal with the various business requirements. They understand that a one-size-fits-all approach no longer works, and recognize the value in adopting scalable, flexible tools and open data formats to support interoperability in a modern data architecture to accelerate the delivery of new solutions.

Customers are using AWS and Snowflake to develop purpose-built data architectures that provide the performance required for modern analytics and artificial intelligence (AI) use cases. Implementing these solutions requires data sharing between purpose-built data stores. This is why Snowflake and AWS are delivering enhanced support for Apache Iceberg to enable and facilitate data interoperability between data services.

Apache Iceberg is an open-source table format that provides reliability, simplicity, and high performance for large datasets with transactional integrity between various processing engines. In this post, we discuss the following:

  • Advantages of Iceberg tables for data lakes
  • Two architectural patterns for sharing Iceberg tables between AWS and Snowflake:
    • Manage your Iceberg tables with AWS Glue Data Catalog
    • Manage your Iceberg tables with Snowflake
  • The process of converting existing data lakes tables to Iceberg tables without copying the data

Now that you have a high-level understanding of the topics, let’s dive into each of them in detail.

Advantages of Apache Iceberg

Apache Iceberg is a distributed, community-driven, Apache 2.0-licensed, 100% open-source data table format that helps simplify data processing on large datasets stored in data lakes. Data engineers use Apache Iceberg because it’s fast, efficient, and reliable at any scale and keeps records of how datasets change over time. Apache Iceberg offers integrations with popular data processing frameworks such as Apache Spark, Apache Flink, Apache Hive, Presto, and more.

Iceberg tables maintain metadata to abstract large collections of files, providing data management features including time travel, rollback, data compaction, and full schema evolution, reducing management overhead. Originally developed at Netflix before being open sourced to the Apache Software Foundation, Apache Iceberg was a blank-slate design to solve common data lake challenges like user experience, reliability, and performance, and is now supported by a robust community of developers focused on continually improving and adding new features to the project, serving real user needs and providing them with optionality.

Transactional data lakes built on AWS and Snowflake

Snowflake provides various integrations for Iceberg tables with multiple storage options, including Amazon S3, and multiple catalog options, including AWS Glue Data Catalog and Snowflake. AWS provides integrations for various AWS services with Iceberg tables as well, including AWS Glue Data Catalog for tracking table metadata. Combining Snowflake and AWS gives you multiple options to build out a transactional data lake for analytical and other use cases such as data sharing and collaboration. By adding a metadata layer to data lakes, you get a better user experience, simplified management, and improved performance and reliability on very large datasets.

Manage your Iceberg table with AWS Glue

You can use AWS Glue to ingest, catalog, transform, and manage the data on Amazon Simple Storage Service (Amazon S3). AWS Glue is a serverless data integration service that allows you to visually create, run, and monitor extract, transform, and load (ETL) pipelines to load data into your data lakes in Iceberg format. With AWS Glue, you can discover and connect to more than 70 diverse data sources and manage your data in a centralized data catalog. Snowflake integrates with AWS Glue Data Catalog to access the Iceberg table catalog and the files on Amazon S3 for analytical queries. This greatly improves performance and compute cost in comparison to external tables on Snowflake, because the additional metadata improves pruning in query plans.

You can use this same integration to take advantage of the data sharing and collaboration capabilities in Snowflake. This can be very powerful if you have data in Amazon S3 and need to enable Snowflake data sharing with other business units, partners, suppliers, or customers.

The following architecture diagram provides a high-level overview of this pattern.

The workflow includes the following steps:

  1. AWS Glue extracts data from applications, databases, and streaming sources. AWS Glue then transforms it and loads it into the data lake in Amazon S3 in Iceberg table format, while inserting and updating the metadata about the Iceberg table in AWS Glue Data Catalog.
  2. The AWS Glue crawler generates and updates Iceberg table metadata and stores it in AWS Glue Data Catalog for existing Iceberg tables on an S3 data lake.
  3. Snowflake integrates with AWS Glue Data Catalog to retrieve the snapshot location.
  4. In the event of a query, Snowflake uses the snapshot location from AWS Glue Data Catalog to read Iceberg table data in Amazon S3.
  5. Snowflake can query across Iceberg and Snowflake table formats. You can share data for collaboration with one or more accounts in the same Snowflake region. You can also use data in Snowflake for visualization using Amazon QuickSight, or use it for machine learning (ML) and artificial intelligence (AI) purposes with Amazon SageMaker.

Manage your Iceberg table with Snowflake

A second pattern also provides interoperability across AWS and Snowflake, but implements data engineering pipelines for ingestion and transformation to Snowflake. In this pattern, data is loaded to Iceberg tables by Snowflake through integrations with AWS services like AWS Glue or through other sources like Snowpipe. Snowflake then writes data directly to Amazon S3 in Iceberg format for downstream access by Snowflake and various AWS services, and Snowflake manages the Iceberg catalog that tracks snapshot locations across tables for AWS services to access.

Like the previous pattern, you can use Snowflake-managed Iceberg tables with Snowflake data sharing, but you can also use S3 to share datasets in cases where one party does not have access to Snowflake.

The following architecture diagram provides an overview of this pattern with Snowflake-managed Iceberg tables.

This workflow consists of the following steps:

  1. In addition to loading data via the COPY command, Snowpipe, and the native Snowflake connector for AWS Glue, you can integrate data via the Snowflake Data Sharing.
  2. Snowflake writes Iceberg tables to Amazon S3 and updates metadata automatically with every transaction.
  3. Iceberg tables in Amazon S3 are queried by Snowflake for analytical and ML workloads using services like QuickSight and SageMaker.
  4. Apache Spark services on AWS can access snapshot locations from Snowflake via a Snowflake Iceberg Catalog SDK and directly scan the Iceberg table files in Amazon S3.

Comparing solutions

These two patterns highlight options available to data personas today to maximize their data interoperability between Snowflake and AWS using Apache Iceberg. But which pattern is ideal for your use case? If you’re already using AWS Glue Data Catalog and only require Snowflake for read queries, then the first pattern can integrate Snowflake with AWS Glue and Amazon S3 to query Iceberg tables. If you’re not already using AWS Glue Data Catalog and require Snowflake to perform reads and writes, then the second pattern is likely a good solution that allows for storing and accessing data from AWS.

Considering that reads and writes will probably operate on a per-table basis rather than the entire data architecture, it is advisable to use a combination of both patterns.

Migrate existing data lakes to a transactional data lake using Apache Iceberg

You can convert existing Parquet, ORC, and Avro-based data lake tables on Amazon S3 to Iceberg format to reap the benefits of transactional integrity while improving performance and user experience. There are several Iceberg table migration options (SNAPSHOT, MIGRATE, and ADD_FILES) for migrating existing data lake tables in-place to Iceberg format, which is preferable to rewriting all of the underlying data files—a costly and time-consuming effort with large datasets. In this section, we focus on ADD_FILES, because it’s useful for custom migrations.

For ADD_FILES options, you can use AWS Glue to generate Iceberg metadata and statistics for an existing data lake table and create new Iceberg tables in AWS Glue Data Catalog for future use without needing to rewrite the underlying data. For instructions on generating Iceberg metadata and statistics using AWS Glue, refer to Migrate an existing data lake to a transactional data lake using Apache Iceberg or Convert existing Amazon S3 data lake tables to Snowflake Unmanaged Iceberg tables using AWS Glue.

This option requires that you pause data pipelines while converting the files to Iceberg tables, which is a straightforward process in AWS Glue because the destination just needs to be changed to an Iceberg table.

Conclusion

In this post, you saw the two architecture patterns for implementing Apache Iceberg in a data lake for better interoperability across AWS and Snowflake. We also provided guidance on migrating existing data lake tables to Iceberg format.

Sign up for AWS Dev Day on April 10 to get hands-on not only with Apache Iceberg, but also with streaming data pipelines with Amazon Data Firehose and Snowpipe Streaming, and generative AI applications with Streamlit in Snowflake and Amazon Bedrock.


About the Authors

Andries Engelbrecht is a Principal Partner Solutions Architect at Snowflake and works with strategic partners. He is actively engaged with strategic partners like AWS supporting product and service integrations as well as the development of joint solutions with partners. Andries has over 20 years of experience in the field of data and analytics.

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architectures on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Brian Dolan joined Amazon as a Military Relations Manager in 2012 after his first career as a Naval Aviator. In 2014, Brian joined Amazon Web Services, where he helped Canadian customers from startups to enterprises explore the AWS Cloud. Most recently, Brian was a member of the Non-Relational Business Development team as a Go-To-Market Specialist for Amazon DynamoDB and Amazon Keyspaces before joining the Analytics Worldwide Specialist Organization in 2022 as a Go-To-Market Specialist for AWS Glue.

Nidhi Gupta is a Sr. Partner Solution Architect at AWS. She spends her days working with customers and partners, solving architectural challenges. She is passionate about data integration and orchestration, serverless and big data processing, and machine learning. Nidhi has extensive experience leading the architecture design and production release and deployments for data workloads.

Scott Teal is a Product Marketing Lead at Snowflake and focuses on data lakes, storage, and governance.

Enhance monitoring and debugging for AWS Glue jobs using new job observability metrics, Part 3: Visualization and trend analysis using Amazon QuickSight

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/enhance-monitoring-and-debugging-for-aws-glue-jobs-using-new-job-observability-metrics-part-3-visualization-and-trend-analysis-using-amazon-quicksight/

In Part 2 of this series, we discussed how to enable AWS Glue job observability metrics and integrate them with Grafana for real-time monitoring. Grafana provides powerful customizable dashboards to view pipeline health. However, to analyze trends over time, aggregate from different dimensions, and share insights across the organization, a purpose-built business intelligence (BI) tool like Amazon QuickSight may be more effective for your business. QuickSight makes it straightforward for business users to visualize data in interactive dashboards and reports.

In this post, we explore how to connect QuickSight to Amazon CloudWatch metrics and build graphs to uncover trends in AWS Glue job observability metrics. Analyzing historical patterns allows you to optimize performance, identify issues proactively, and improve planning. We walk through ingesting CloudWatch metrics into QuickSight using a CloudWatch metric stream and QuickSight SPICE. With this integration, you can use line charts, bar charts, and other graph types to uncover daily, weekly, and monthly patterns. QuickSight lets you perform aggregate calculations on metrics for deeper analysis. You can slice data by different dimensions like job name, see anomalies, and share reports securely across your organization. With these insights, teams have the visibility to make data integration pipelines more efficient.

Solution overview

The following architecture diagram illustrates the workflow to implement the solution.

The workflow includes the following steps:

  1. AWS Glue jobs emit observability metrics to CloudWatch metrics.
  2. CloudWatch streams metric data through a metric stream into Amazon Data Firehose.
  3. Data Firehose uses an AWS Lambda function to transform data and ingest the transformed records into an Amazon Simple Storage Service (Amazon S3) bucket.
  4. An AWS Glue crawler scans data on the S3 bucket and populates table metadata on the AWS Glue Data Catalog.
  5. QuickSight periodically runs Amazon Athena queries to load query results to SPICE and then visualize the latest metric data.

All of the resources are defined in a sample AWS Cloud Development Kit (AWS CDK) template. You can deploy the end-to-end solution to visualize and analyze trends of the observability metrics.

Sample AWS CDK template

This post provides a sample AWS CDK template for a dashboard using AWS Glue observability metrics.

Typically, you have multiple accounts to manage and run resources for your data pipeline.

In this template, we assume the following accounts:

  • Monitoring account – This hosts the central S3 bucket, central Data Catalog, and QuickSight-related resources
  • Source account – This hosts individual data pipeline resources on AWS Glue and the resources to send metrics to the monitoring account

The template works even when the monitoring account and source account are the same.

This sample template consists of four stacks:

  • Amazon S3 stack – This provisions the S3 bucket
  • Data Catalog stack – This provisions the AWS Glue database, table, and crawler
  • QuickSight stack – This provisions the QuickSight data source, dataset, and analysis
  • Metrics sender stack – This provisions the CloudWatch metric stream, Firehose delivery stream, and Lambda function for transformation

Prerequisites

You should have the following prerequisites:

  • Python 3.9 or later
  • AWS accounts for the monitoring account and source account
  • An AWS named profile for the monitoring account and source account
  • The AWS CDK Toolkit 2.87.0 or later

Initialize the CDK project

To initialize the project, complete the following steps:

  1. Clone the cdk template to your workplace:
    $ git clone [email protected]:aws-samples/aws-glue-cdk-baseline.git 
    
    $ cd aws-glue-cdk-baseline.git

  2. Create a Python virtual environment specific to the project on the client machine:
    $ python3 -m venv .venv

We use a virtual environment in order to isolate the Python environment for this project and not install software globally.

  1. Activate the virtual environment according to your OS:
    • On MacOS and Linux, use the following code:
      $ source .venv/bin/activate

    • On a Windows platform, use the following code:
      % .venv\Scripts\activate.bat

After this step, the subsequent steps run within the bounds of the virtual environment on the client machine and interact with the AWS account as needed.

  1. Install the required dependencies described in requirements.txt to the virtual environment:
    $ pip install -r requirements.txt

  2. Edit the configuration file default-config.yaml based on your environments (replace each account ID with your own.
    create_s3_stack: false
    create_metrics_sender_stack: false
    create_catalog_stack: false
    create_quicksight_stack: true
    
    s3_bucket_name: glue-observability-demo-dashboard
    
    firehose_log_group_name: /aws/kinesisfirehose/observability-demo-metric-stream
    firehose_lambda_buffer_size_mb: 2
    firehose_lambda_buffer_interval_seconds: 60
    firehose_s3_buffer_size_mb: 128
    firehose_s3_buffer_interval_seconds: 300
    
    glue_database_name: observability_demo_db
    glue_table_name: metric_data
    glue_crawler_name: observability_demo_crawler
    glue_crawler_cron_schedule: "cron(42 * * * ? *)"
    
    athena_workgroup_name: primary

Bootstrap your AWS environments

Run the following commands to bootstrap your AWS environments:

  1. In the monitoring account, provide your monitoring account number, AWS Region, and monitoring profile:
    $ cdk bootstrap aws://<MONITORING-ACCOUNT-NUMBER>/<REGION> --profile <MONITORING-PROFILE> \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess

  2. In the source account, provide your source account number, Region, and source profile:x
    $ cdk bootstrap aws://<SOURCE-ACCOUNT-NUMBER>/<REGION> --profile <SOURCE-PROFILE> \
    --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess

When you use only one account for all environments, you can just run thecdk bootstrapcommand one time.

Deploy your AWS resources

Run the following commands to deploy your AWS resources:

  1. Run the following command using the monitoring account to deploy resources defined in the AWS CDK template:
    $ cdk deploy '*' --profile <MONITORING-PROFILE>

  2. Run the following command using the source account to deploy resources defined in the AWS CDK template:
    $ cdk deploy MetricSenderStack --profile <SOURCE-PROFILE>

Configure QuickSight permissions

Initially, the new QuickSight resources including the dataset and analysis created by the AWS CDK template are not visible for you because there are no QuickSight permissions configured yet.

To make the dataset and analysis visible for you, complete the following steps:

  1. On the QuickSight console, navigate to the user menu and choose Manage QuickSight.
  2. In the navigation pane, choose Manage assets.
  3. Under Browse assets, choose Analysis.
  4. Search for GlueObservabilityAnalysis, and select it.
  5. Choose SHARE.
  6. For User or Group, select your user, then choose SHARE (1).
  7. Wait for the share to be complete, then choose DONE.
  8. On the Manage assets page, choose Datasets.
  9. Search for observability_demo.metrics_data, and select it.
  10. Choose SHARE.
  11. For User or Group, select your user, then choose SHARE (1).
  12. Wait for the share to be complete, then choose DONE.

Explore the default QuickSight analysis

Now your QuickSight analysis and dataset are visible to you. You can return to the QuickSight console and choose GlueObservabilityAnalysis under Analysis. The following screenshot shows your dashboard.

The sample analysis has two tabs: Monitoring and Insights. By default, the Monitoring tab has the following charts:

  • [Reliability] Job Run Errors Breakdown
  • [Reliability] Job Run Errors (Total)
  • [Performance] Skewness Job
  • [Performance] Skewness Job per Job

  • [Resource Utilization] Worker Utilization
  • [Resource Utilization] Worker Utilization per Job
  • [Throughput] BytesRead, RecordsRead, FilesRead, PartitionRead (Avg)
  • [Throughput] BytesWritten, RecordsWritten, FilesWritten (Avg)

  • [Resource Utilization Disk Available GB (Min)
  • [Resource Utilization Max Disk Used % (Max)

  • [Driver OOM] OOM Error Count
  • [Driver OOM] Max Heap Memory Used % (Max)
  • [Executor OOM] OOM Error Count
  • [Executor OOM] Max Heap Memory Used % (Max)

By default, the Insights tab has following insights:

  • Bottom Ranked Worker Utilization
  • Top Ranked Skewness Job

  • Forecast Worker Utilization
  • Top Mover readBytes

You can add any new graph charts or insights using the observability metrics based on your requirements.

Publish the QuickSight dashboard

When the analysis is ready, complete the following steps to publish the dashboard:

  1. Choose PUBLISH.
  2. Select Publish new dashboard as, and enter GlueObservabilityDashboard.
  3. Choose Publish dashboard.

Then you can view and share the dashboard.

Visualize and analyze with AWS Glue job observability metrics

Let’s use the dashboard to make AWS Glue usage more performant.

Looking at the Skewness Job per Job visualization, there was spike on November 1, 2023. The skewness metrics of the job multistage-demo showed 9.53, which is significantly higher than others.

Let’s drill down into details. You can choose Controls, and change filter conditions based on date time, Region, AWS account ID, AWS Glue job name, job run ID, and the source and sink of the data stores. For now, let’s filter with the job name multistage-demo.

The filtered Worker Utilization per Job visualization shows 0.5, and its minimum value was 0.16. It seems like that there is a room for improvement in resource utilization. This observation guides you to enable auto scaling for this job to increase the worker utilization.

Clean up

Run the following commands to clean up your AWS resources:

  1. Run the following command using the monitoring account to clean up resources:
    $ cdk destroy '*' --profile <MONITORING-PROFILE>

    Run the following command using the source account to clean up resources:

    $ cdk destroy MetricSenderStack --profile <SOURCE-PROFILE>

Considerations

QuickSight integration is designed for analysis and better flexibility. You can aggregate metrics based on any fields. When dealing with many jobs at once, QuickSight insights help you identify problematic jobs.

QuickSight integration is achieved with more resources in your environments. The monitoring account needs an AWS Glue database, table, crawler, and S3 bucket, and the ability to run Athena queries to visualize metrics in QuickSight. Each source account needs to have one metric stream and one Firehose delivery stream. This can incur additional costs.

All the required resources are templatized in AWS CDK.

Conclusion

In this post, we explored how to visualize and analyze AWS Glue job observability metrics on QuickSight using CloudWatch metric streams and SPICE. By connecting the new observability metrics to interactive QuickSight dashboards, you can uncover daily, weekly, and monthly patterns to optimize AWS Glue job usage. The rich visualization capabilities of QuickSight allow you to analyze trends in metrics like worker utilization, error categories, throughput, and more. Aggregating metrics and slicing data by different dimensions such as job name can provide deeper insights.

The sample dashboard showed metrics over time, top errors, and comparative job analytics. These visualizations and reports can be securely shared with teams across the organization. With data-driven insights on the AWS Glue observability metrics, you can have deeper insights on performance bottlenecks, common errors, and more.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Chuhan LiuChuhan Liu is a Software Development Engineer on the AWS Glue team. He is passionate about building scalable distributed systems for big data processing, analytics, and management. In his spare time, he enjoys playing tennis.

XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has a track record of more than 18 years innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple to use interfaces to efficiently manage and transform petabytes of data seamlessly across data lakes on Amazon S3, databases and data-warehouses on cloud.

Scale AWS Glue jobs by optimizing IP address consumption and expanding network capacity using a private NAT gateway

Post Syndicated from Sushanth Kothapally original https://aws.amazon.com/blogs/big-data/scale-aws-glue-jobs-by-optimizing-ip-address-consumption-and-expanding-network-capacity-using-a-private-nat-gateway/

As businesses expand, the demand for IP addresses within the corporate network often exceeds the supply. An organization’s network is often designed with some anticipation of future requirements, but as enterprises evolve, their information technology (IT) needs surpass the previously designed network. Companies may find themselves challenged to manage the limited pool of IP addresses.

For data engineering workloads when AWS Glue is used in such a constrained network configuration, your team may sometimes face hurdles running many jobs simultaneously. This happens because you may not have enough IP addresses to support the required connections to databases. To overcome this shortage, the team may get more IP addresses from your corporate network pool. These obtained IP addresses can be unique (non-overlapping) or overlapping, when the IP addresses are reused in your corporate network.

When you use overlapping IP addresses, you need an additional network management to establish connectivity. Networking solutions can include options like private Network Address Translation (NAT) gateways, AWS PrivateLink, or self-managed NAT appliances to translate IP addresses.

In this post, we will discuss two strategies to scale AWS Glue jobs:

  1. Optimizing the IP address consumption by right-sizing Data Processing Units (DPUs), using the Auto Scaling feature of AWS Glue, and fine-tuning of the jobs.
  2. Expanding the network capacity using additional non-routable Classless Inter-Domain Routing (CIDR) range with a private NAT gateway.

Before we dive deep into these solutions, let us understand how AWS Glue uses Elastic Network Interface (ENI) for establishing connectivity. To enable access to data stores inside a VPC, you need to create an AWS Glue connection that is attached to your VPC. When an AWS Glue job runs in your VPC, the job creates an ENI inside the configured VPC for each data connection, and that ENI uses an IP address in the specified VPC. These ENIs are short-lived and active until job is complete.

Now let us look at the first solution that explains optimizing the AWS Glue IP address consumption.

Strategies for efficient IP address consumption

In AWS Glue, the number of workers a job uses determines the count of IP addresses used from your VPC subnet. This is because each worker requires one IP address that maps to one ENI. When you don’t have enough CIDR range allocated to the AWS Glue subnet, you may observe IP address exhaustion errors. The following are some best practices to optimize AWS Glue IP address consumption:

  • Right-sizing the job’s DPUs – AWS Glue is a distributed processing engine. It works efficiently when it can run tasks in parallel. If a job has more than the required DPUs, it doesn’t always run quicker. So, finding the right number of DPUs will make sure you use IP addresses optimally. By building observability in the system and analyzing the job performance, you can get insights into ENI consumption trends and then configure the appropriate capacity on the job for the right size. For more details, refer to Monitoring for DPU capacity planning. The Spark UI is a helpful tool to monitor AWS Glue jobs’ workers usage. For more details, refer to Monitoring jobs using the Apache Spark web UI.
  • AWS Glue Auto Scaling – It’s often difficult to predict a job’s capacity requirements upfront. Enabling the Auto Scaling feature of AWS Glue will offload some of this responsibility to AWS. At runtime based on the workload requirements, the job automatically scales worker nodes upto the defined maximum configuration. If there is no additional need, AWS Glue will not overprovision workers, thereby saving resources and reducing cost. The Auto Scaling feature is available in AWS Glue 3.0 and later. For more information, refer to Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark.
  • Job-level optimization – Identify job-level optimizations by using AWS Glue job metrics , and apply best practices from Best practices for performance tuning AWS Glue for Apache Spark jobs.

Next let us look at the second solution that elaborates network capacity expansion.

Solutions for network size (IP address) expansion

In this section, we will discuss two possible solutions to expand network size in more detail.

Expand VPC CIDR ranges with routable addresses

One solution is to add more private IPv4 CIDR ranges from RFC 1918 to your VPC. Theoretically, each AWS account can be assigned to some or all these IP address CIDRs. Your IP Address Management (IPAM) team often manages the allocation of IP addresses that each business unit can use from RFC1918 to avoid overlapping IP addresses across multiple AWS accounts or business units. If your current routable IP address quota allocated by the IPAM team is not sufficient, then you can request for more.

If your IPAM team issues you an additional non-overlapping CIDR range, then you can either add it as a secondary CIDR to your existing VPC or create a new VPC with it. If you are planning to create a new VPC, then you can inter-connect the VPCs via VPC peering or AWS Transit Gateway.

If this additional capacity is sufficient to run all your jobs within defined the timeframe, then it is a simple and cost-effective solution. Otherwise, you can consider adopting overlapping IP addresses with a private NAT gateway, as described in the following section. With the following solution you must use Transit Gateway to connect VPCs as VPC peering is not possible when there are overlapping CIDR ranges in those two VPCs.

Configure non-routable CIDR with a private NAT gateway

As described in the AWS whitepaper Building a Scalable and Secure Multi-VPC AWS Network Infrastructure, you can expand your network capacity by creating a non-routable IP address subnet and using a private NAT gateway that is located in a routable IP address space (non-overlapping) to route traffic. A private NAT gateway translates and routes traffic between non-routable IP addresses and routable IP addresses. The following diagram demonstrates the solution with reference to AWS Glue.

High level architecture

As you can see in the above diagram, VPC A (ETL) has two CIDR ranges attached. The smaller CIDR range 172.33.0.0/24 is routable because it not reused anywhere, whereas the larger CIDR range 100.64.0.0/16 is non-routable because it is reused in the database VPC.

In VPC B (Database), we have hosted two databases in routable subnets 172.30.0.0/26 and 172.30.0.64/26. These two subnets are in two separate Availability Zones for high availability. We also have two additional unused subnet 100.64.0.0/24 and 100.64.1.0/24 to simulate a non-routable setup.

You can choose the size of the non-routable CIDR range based on your capacity requirements. Since you can reuse IP addresses, you can create a very large subnet as needed. For example, a CIDR mask of /16 would give you approximately 65,000 IPv4 addresses. You can work with your network engineering team and size the subnets.

In short, you can configure AWS Glue jobs to use both routable and non-routable subnets in your VPC to maximize the available IP address pool.

Now let us understand how Glue ENIs that are in a non-routable subnet communicate with data sources in another VPC.

Call flow

The data flow for the use case demonstrated here is as follows (referring to the numbered steps in figure above):

  1. When an AWS Glue job needs to access a data source, it first uses the AWS Glue connection on the job and creates the ENIs in the non-routable subnet 100.64.0.0/24 in VPC A. Later AWS Glue uses the database connection configuration and attempts to connect to the database in VPC B 172.30.0.0/24.
  2. As per the route table VPCA-Non-Routable-RouteTable the destination 172.30.0.0/24 is configured for a private NAT gateway. The request is sent to the NAT gateway, which then translates the source IP address from a non-routable IP address to a routable IP address. Traffic is then sent to the transit gateway attachment in VPC A because it’s associated with the VPCA-Routable-RouteTable route table in VPC A.
  3. Transit Gateway uses the 172.30.0.0/24 route and sends the traffic to the VPC B transit gateway attachment.
  4. The transit gateway ENI in VPC B uses VPC B’s local route to connect to the database endpoint and query the data.
  5. When the query is complete, the response is sent back to VPC A. The response traffic is routed to the transit gateway attachment in VPC B, then Transit Gateway uses the 172.33.0.0/24 route and sends traffic to the VPC A transit gateway attachment.
  6. The transit gateway ENI in VPC A uses the local route to forward the traffic to the private NAT gateway, which translates the destination IP address to that of ENIs in non-routable subnet.
  7. Finally, the AWS Glue job receives the data and continues processing.

The private NAT gateway solution is an option if you need extra IP addresses when you can’t obtain them from a routable network in your organization. Sometimes with each additional service there is an additional cost incurred, and this trade-off is necessary to meet your goals. Refer to the NAT Gateway pricing section on the Amazon VPC pricing page for more information.

Prerequisites

To complete the walk-through of the private NAT gateway solution, you need the following:

Deploy the solution

To implement the solution, complete the following steps:

  1. Sign in to your AWS management console.
  2. Deploy the solution by clicking Launch stack . This stack defaults to us-east-1, you can select your desired Region.
  3. Click next and then specify the stack details. You can retain the input parameters to the prepopulated default values or change them as needed.
  4. For DatabaseUserPassword, enter an alphanumeric password of your choice and ensure to note it down for further use.
  5. For S3BucketName, enter a unique Amazon Simple Storage Service (Amazon S3) bucket name. This bucket stores the AWS Glue job script that will be copied from an AWS public code repository.Stack details
  6. Click next.
  7. Leave the default values and click next again.
  8. Review the details, acknowledge the creation of IAM resources, and click submit to start the deployment.

You can monitor the events to see resources being created on the AWS CloudFormation console. It may take around 20 minutes for the stack resources to be created.

After the stack creation is complete, go to the Outputs tab on the AWS CloudFormation console and note the following values for later use:

  • DBSource
  • DBTarget
  • SourceCrawler
  • TargetCrawler

Connect to an AWS Cloud9 instance

Next, we need to prepare the source and target Amazon RDS for MySQL tables using an AWS Cloud9 instance. Complete the following steps:

  1. On the AWS Cloud9 console page, locate the aws-glue-cloud9 environment.
  2. In the Cloud9 IDE column, click on Open to launch your AWS Cloud9 instance in a new web browser.

Prepare the source MySQL table

Complete the following steps to prepare your source table:

  1. From the AWS Cloud9 terminal, install the MySQL client using the following command: sudo yum update -y && sudo yum install -y mysql
  2. Connect to the source database using the following command. Replace the source hostname with the DBSource value you captured earlier. When prompted, enter the database password that you specified during the stack creation. mysql -h <Source Hostname> -P 3306 -u admin -p
  3. Run the following scripts to create the source emp table, and load the test data:
    -- connect to source database
    USE srcdb;
    -- Drop emp table if it exists
    DROP TABLE IF EXISTS emp;
    -- Create the emp table
    CREATE TABLE emp (empid INT AUTO_INCREMENT,
                      ename VARCHAR(100) NOT NULL,
                      edept VARCHAR(100) NOT NULL,
                      PRIMARY KEY (empid));
    -- Create a stored procedure to load sample records into emp table
    DELIMITER $$
    CREATE PROCEDURE sp_load_emp_source_data()
    BEGIN
    DECLARE empid INT;
    DECLARE ename VARCHAR(100);
    DECLARE edept VARCHAR(50);
    DECLARE cnt INT DEFAULT 1; -- Initialize counter to 1 to auto-increment the PK
    DECLARE rec_count INT DEFAULT 1000; -- Initialize sample records counter
    TRUNCATE TABLE emp; -- Truncate the emp table
    WHILE cnt <= rec_count DO -- Loop and load the required number of sample records
    SET ename = CONCAT('Employee_', FLOOR(RAND() * 100) + 1); -- Generate random employee name
    SET edept = CONCAT('Dept_', FLOOR(RAND() * 100) + 1); -- Generate random employee department
    -- Insert record with auto-incrementing empid
    INSERT INTO emp (ename, edept) VALUES (ename, edept);
    -- Increment counter for next record
    SET cnt = cnt + 1;
    END WHILE;
    COMMIT;
    END$$
    DELIMITER ;
    -- Call the above stored procedure to load sample records into emp table
    CALL sp_load_emp_source_data();
  4. Check the source emp table’s count using the below SQL query (you need this at later step for verification). select count(*) from emp;
  5. Run the following command to exit from the MySQL client utility and return to the AWS Cloud9 instance’s terminal: quit;

Prepare the target MySQL table

Complete the following steps to prepare the target table:

  1. Connect to the target database using the following command. Replace the target hostname with the DBTarget value you captured earlier. When prompted enter the database password that you specified during the stack creation. mysql -h <Target Hostname> -P 3306 -u admin -p
  2. Run the following scripts to create the target emp table. This table will be loaded by the AWS Glue job in the subsequent step.
    -- connect to the target database
    USE targetdb;
    -- Drop emp table if it exists 
    DROP TABLE IF EXISTS emp;
    -- Create the emp table
    CREATE TABLE emp (empid INT AUTO_INCREMENT,
                      ename VARCHAR(100) NOT NULL,
                      edept VARCHAR(100) NOT NULL,
                      PRIMARY KEY (empid)
    );

Verify the networking setup (Optional)

The following steps are useful to understand NAT gateway, route tables, and the transit gateway configurations of private NAT gateway solution. These components were created during the CloudFormation stack creation.

  1. On the Amazon VPC console page, navigate to Virtual private cloud section and locate NAT gateways.
  2. Search for NAT Gateway with name Glue-OverlappingCIDR-NATGW and explore it further. As you can see in the following screenshot, the NAT gateway was created in VPC A (ETL) on the routable subnet.NAT Gateway setup
  3. In the left side navigation pane, navigate to Route tables under virtual private cloud section.
  4. Search for VPCA-Non-Routable-RouteTable and explore it further. You can see that the route table is configured to translate traffic from overlapping CIDR using the NAT gateway.Route table setup
  5. In the left side navigation pane, navigate to Transit gateways section and click on Transit gateway attachments. Enter VPC- in the search box and locate the two newly created transit gateway attachments.
  6. You can explore these attachments further to learn their configurations.

Run the AWS Glue crawlers

Complete the following steps to run the AWS Glue crawlers that are required to catalog the source and target emp tables. This is a prerequisite step for running the AWS Glue job.

  1. On the AWS Glue Console page, under Data Catalog section in the navigation pane, click on Crawlers.
  2. Locate the source and target crawlers that you noted earlier.
  3. Select these crawlers and click Run to create the respective AWS Glue Data Catalog tables.
  4. You can monitor the AWS Glue crawlers for the successful completion. It may take around 3–4 minutes for both crawlers to complete. When they’re done, the last run status of the job changes to Succeeded, and you can also see there are two AWS Glue catalog tables created from this run.Crawler run sucessful

Run the AWS Glue ETL job

After you set up the tables and complete the prerequisite steps, you are now ready to run the AWS Glue job that you created using the CloudFormation template. This job connects to the source RDS for MySQL database, extracts the data, and loads the data into the target RDS for MySQL database. This job reads data from a source MySQL table and loads it to the target MySQL table using private NAT gateway solution. To run the AWS Glue job, complete the following steps:

  1. On the AWS Glue console, click on ETL jobs in the navigation pane.
  2. Click on the job glue-private-nat-job.
  3. Click Run to start it.

The following is the PySpark script for this ETL job:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node = glueContext.create_dynamic_frame.from_catalog(
    database="glue_cat_db_source",
    table_name="srcdb_emp",
    transformation_ctx="AWSGlueDataCatalog_node",
)

# Script generated for node Change Schema
ChangeSchema_node = ApplyMapping.apply(
    frame=AWSGlueDataCatalog_node,
    mappings=[
        ("empid", "int", "empid", "int"),
        ("ename", "string", "ename", "string"),
        ("edept", "string", "edept", "string"),
    ],
    transformation_ctx="ChangeSchema_node",
)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node = glueContext.write_dynamic_frame.from_catalog(
    frame=ChangeSchema_node,
    database="glue_cat_db_target",
    table_name="targetdb_emp",
    transformation_ctx="AWSGlueDataCatalog_node",
)

job.commit()

Based on the job’s DPU configuration, AWS Glue creates a set of ENIs in the non-routable subnet that is configured on the AWS Glue connection. You can monitor these ENIs on the Network Interfaces page of the Amazon Elastic Compute Cloud (Amazon EC2) console.

The below screenshot shows the 10 ENIs that were created for the job run to match the requested number of workers configured on the job parameters. As expected, the ENIs were created in the non-routable subnet of VPC A, enabling scalability of IP addresses. After the job is complete, these ENIs will be automatically released by AWS Glue.Execution ENIs

When the AWS Glue job is running, you can monitor its status. Upon successful completion, the job’s status changes to Succeeded.Job successful completition

Verify the results

After the AWS Glue job is complete, connect to the target MySQL database. Verify if the target record count matches to the source. You can use the below SQL query in AWS Cloud9 terminal.

USE targetdb;
SELECT count(*) from emp;

Finally, exit from the MySQL client utility using the following command and return to the AWS Cloud9 terminal: quit;

You can now confirm that AWS Glue has successfully completed a job to load data to a target database using the IP addresses from a non-routable subnet. This concludes end to end testing of the private NAT gateway solution.

Clean up

To avoid incurring future charges, delete the resource created via CloudFormation stack by completing the following steps:

  1. On the AWS CloudFormation console, click Stacks in the navigation pane.
  2. Select the stack AWSGluePrivateNATStack.
  3. Click on Delete to delete the stack. When prompted confirm the stack deletion.

Conclusion

In this post, we demonstrated how you can scale AWS Glue jobs by optimizing IP addresses consumption and expanding your network capacity by using a private NAT gateway solution. This two-fold approach helps you to get unblocked in an environment that has IP address capacity constraints. The options discussed in the AWS Glue IP address optimization section are complimentary to the IP address expansion solutions, and you can iteratively build to mature your data platform.

Learn more about AWS Glue job optimization techniques from Monitor and optimize cost on AWS Glue for Apache Spark and Best practices to scale Apache Spark jobs and partition data with AWS Glue.


About the authors

Author1Sushanth Kothapally is a Solutions Architect at Amazon Web Services supporting Automotive and Manufacturing customers. He is passionate about designing technology solutions to meet business goals and has keen interest in serverless and event-driven architectures.

Author2Senthil Kamala Rathinam is a Solutions Architect at Amazon Web Services specializing in Data and Analytics. He is passionate about helping customers to design and build modern data platforms. In his free time, Senthil loves to spend time with his family and play badminton.

Gain insights from historical location data using Amazon Location Service and AWS analytics services

Post Syndicated from Alan Peaty original https://aws.amazon.com/blogs/big-data/gain-insights-from-historical-location-data-using-amazon-location-service-and-aws-analytics-services/

Many organizations around the world rely on the use of physical assets, such as vehicles, to deliver a service to their end-customers. By tracking these assets in real time and storing the results, asset owners can derive valuable insights on how their assets are being used to continuously deliver business improvements and plan for future changes. For example, a delivery company operating a fleet of vehicles may need to ascertain the impact from local policy changes outside of their control, such as the announced expansion of an Ultra-Low Emission Zone (ULEZ). By combining historical vehicle location data with information from other sources, the company can devise empirical approaches for better decision-making. For example, the company’s procurement team can use this information to make decisions about which vehicles to prioritize for replacement before policy changes go into effect.

Developers can use the support in Amazon Location Service for publishing device position updates to Amazon EventBridge to build a near-real-time data pipeline that stores locations of tracked assets in Amazon Simple Storage Service (Amazon S3). Additionally, you can use AWS Lambda to enrich incoming location data with data from other sources, such as an Amazon DynamoDB table containing vehicle maintenance details. Then a data analyst can use the geospatial querying capabilities of Amazon Athena to gain insights, such as the number of days their vehicles have operated in the proposed boundaries of an expanded ULEZ. Because vehicles that do not meet ULEZ emissions standards are subjected to a daily charge to operate within the zone, you can use the location data, along with maintenance data such as age of the vehicle, current mileage, and current emissions standards to estimate the amount the company would have to spend on daily fees.

This post shows how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use this data to drive meaningful insights using AWS Glue and Athena.

Overview of solution

This is a fully serverless solution for location-based asset management. The solution consists of the following interfaces:

  • IoT or mobile application – A mobile application or an Internet of Things (IoT) device allows the tracking of a company vehicle while it is in use and transmits its current location securely to the data ingestion layer in AWS. The ingestion approach is not in scope of this post. Instead, a Lambda function in our solution simulates sample vehicle journeys and directly updates Amazon Location tracker objects with randomized locations.
  • Data analytics – Business analysts gather operational insights from multiple data sources, including the location data collected from the vehicles. Data analysts are looking for answers to questions such as, “How long did a given vehicle historically spend inside a proposed zone, and how much would the fees have cost had the policy been in place over the past 12 months?”

The following diagram illustrates the solution architecture.
Architecture diagram

The workflow consists of the following key steps:

  1. The tracking functionality of Amazon Location is used to track the vehicle. Using EventBridge integration, filtered positional updates are published to an EventBridge event bus. This solution uses distance-based filtering to reduce costs and jitter. Distanced-based filtering ignores location updates in which devices have moved less than 30 meters (98.4 feet).
  2. Amazon Location device position events arrive on the EventBridge default bus with source: ["aws.geo"] and detail-type: ["Location Device Position Event"]. One rule is created to forward these events to two downstream targets: a Lambda function, and a Firehose delivery stream.
  3. Two different patterns, based on each target, are described in this post to demonstrate different approaches to committing the data to a S3 bucket:
    1. Lambda function – The first approach uses a Lambda function to demonstrate how you can use code in the data pipeline to directly transform the incoming location data. You can modify the Lambda function to fetch additional vehicle information from a separate data store (for example, a DynamoDB table or a Customer Relationship Management system) to enrich the data, before storing the results in an S3 bucket. In this model, the Lambda function is invoked for each incoming event.
    2. Firehose delivery stream – The second approach uses a Firehose delivery stream to buffer and batch the incoming positional updates, before storing them in an S3 bucket without modification. This method uses GZIP compression to optimize storage consumption and query performance. You can also use the data transformation feature of Data Firehose to invoke a Lambda function to perform data transformation in batches.
  4. AWS Glue crawls both S3 bucket paths, populates the AWS Glue database tables based on the inferred schemas, and makes the data available to other analytics applications through the AWS Glue Data Catalog.
  5. Athena is used to run geospatial queries on the location data stored in the S3 buckets. The Data Catalog provides metadata that allows analytics applications using Athena to find, read, and process the location data stored in Amazon S3.
  6. This solution includes a Lambda function that continuously updates the Amazon Location tracker with simulated location data from fictitious journeys. The Lambda function is triggered at regular intervals using a scheduled EventBridge rule.

You can test this solution yourself using the AWS Samples GitHub repository. The repository contains the AWS Serverless Application Model (AWS SAM) template and Lambda code required to try out this solution. Refer to the instructions in the README file for steps on how to provision and decommission this solution.

Visual layouts in some screenshots in this post may look different than those on your AWS Management Console.

Data generation

In this section, we discuss the steps to manually or automatically generate journey data.

Manually generate journey data

You can manually update device positions using the AWS Command Line Interface (AWS CLI) command aws location batch-update-device-position. Replace the tracker-name, device-id, Position, and SampleTime values with your own, and make sure that successive updates are more than 30 meters in distance apart to place an event on the default EventBridge event bus:

aws location batch-update-device-position --tracker-name <tracker-name> --updates "[{\"DeviceId\": \"<device-id>\", \"Position\": [<longitude>, <latitude>], \"SampleTime\": \"<YYYY-MM-DDThh:mm:ssZ>\"}]"

Automatically generate journey data using the simulator

The provided AWS CloudFormation template deploys an EventBridge scheduled rule and an accompanying Lambda function that simulates tracker updates from vehicles. This rule is enabled by default, and runs at a frequency specified by the SimulationIntervalMinutes CloudFormation parameter. The data generation Lambda function updates the Amazon Location tracker with a randomized position offset from the vehicles’ base locations.

Vehicle names and base locations are stored in the vehicles.json file. A vehicle’s starting position is reset each day, and base locations have been chosen to give them the ability to drift in and out of the ULEZ on a given day to provide a realistic journey simulation.

You can disable the rule temporarily by navigating to the scheduled rule details on the EventBridge console. Alternatively, change the parameter State: ENABLED to State: DISABLED for the scheduled rule resource GenerateDevicePositionsScheduleRule in the template.yml file. Rebuild and re-deploy the AWS SAM template for this change to take effect.

Location data pipeline approaches

The configurations outlined in this section are deployed automatically by the provided AWS SAM template. The information in this section is provided to describe the pertinent parts of the solution.

Amazon Location device position events

Amazon Location sends device position update events to EventBridge in the following format:

{
    "version":"0",
    "id":"<event-id>",
    "detail-type":"Location Device Position Event",
    "source":"aws.geo",
    "account":"<account-number>",
    "time":"<YYYY-MM-DDThh:mm:ssZ>",
    "region":"<region>",
    "resources":[
        "arn:aws:geo:<region>:<account-number>:tracker/<tracker-name>"
    ],
    "detail":{
        "EventType":"UPDATE",
        "TrackerName":"<tracker-name>",
        "DeviceId":"<device-id>",
        "SampleTime":"<YYYY-MM-DDThh:mm:ssZ>",
        "ReceivedTime":"<YYYY-MM-DDThh:mm:ss.sssZ>",
        "Position":[
            <longitude>, 
            <latitude>
	]
    }
}

You can optionally specify an input transformation to modify the format and contents of the device position event data before it reaches the target.

Data enrichment using Lambda

Data enrichment in this pattern is facilitated through the invocation of a Lambda function. In this example, we call this function ProcessDevicePosition, and use a Python runtime. A custom transformation is applied in the EventBridge target definition to receive the event data in the following format:

{
    "EventType":<EventType>,
    "TrackerName":<TrackerName>,
    "DeviceId":<DeviceId>,
    "SampleTime":<SampleTime>,
    "ReceivedTime":<ReceivedTime>,
    "Position":[<Longitude>,<Latitude>]
}

You could apply additional transformations, such as the refactoring of Latitude and Longitude data into separate key-value pairs if this is required by the downstream business logic processing the events.

The following code demonstrates the Python application logic that is run by the ProcessDevicePosition Lambda function. Error handling has been skipped in this code snippet for brevity. The full code is available in the GitHub repo.

import json
import os
import uuid
import boto3

# Import environment variables from Lambda function.
bucket_name = os.environ["S3_BUCKET_NAME"]
bucket_prefix = os.environ["S3_BUCKET_LAMBDA_PREFIX"]

s3 = boto3.client("s3")

def lambda_handler(event, context):
    key = "%s/%s/%s-%s.json" % (bucket_prefix,
                                event["DeviceId"],
                                event["SampleTime"],
                                str(uuid.uuid4())
    body = json.dumps(event, separators=(",", ":"))
    body_encoded = body.encode("utf-8")
    s3.put_object(Bucket=bucket_name, Key=key, Body=body_encoded)
    return {
        "statusCode": 200,
        "body": "success"
    }

The preceding code creates an S3 object for each device position event received by EventBridge. The code uses the DeviceId as a prefix to write the objects to the bucket.

You can add additional logic to the preceding Lambda function code to enrich the event data using other sources. The example in the GitHub repo demonstrates enriching the event with data from a DynamoDB vehicle maintenance table.

In addition to the prerequisite AWS Identity and Access Management (IAM) permissions provided by the role AWSBasicLambdaExecutionRole, the ProcessDevicePosition function requires permissions to perform the S3 put_object action and any other actions required by the data enrichment logic. IAM permissions required by the solution are documented in the template.yml file.

{
    "Version":"2012-10-17",
    "Statement":[
        {
            "Action":[
                "s3:ListBucket"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>"
            ],
            "Effect":"Allow"
        },
        {
            "Action":[
                "s3:PutObject"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>/<S3_BUCKET_LAMBDA_PREFIX>/*"
            ],
            "Effect":"Allow"
        }
    ]
}

Data pipeline using Amazon Data Firehose

Complete the following steps to create your Firehose delivery stream:

  1. On the Amazon Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose as Direct PUT.
  4. For Destination, choose Amazon S3.
  5. For Firehose stream name, enter a name (for this post, ProcessDevicePositionFirehose).
    Create Firehose stream
  6. Configure the destination settings with details about the S3 bucket in which the location data is stored, along with the partitioning strategy:
    1. Use <S3_BUCKET_NAME> and <S3_BUCKET_FIREHOSE_PREFIX> to determine the bucket and object prefixes.
    2. Use DeviceId as an additional prefix to write the objects to the bucket.
  7. Enable Dynamic partitioning and New line delimiter to make sure partitioning is automatic based on DeviceId, and that new line delimiters are added between records in objects that are delivered to Amazon S3.

These are required by AWS Glue to later crawl the data, and for Athena to recognize individual records.
Destination settings for Firehose stream

Create an EventBridge rule and attach targets

The EventBridge rule ProcessDevicePosition defines two targets: the ProcessDevicePosition Lambda function, and the ProcessDevicePositionFirehose delivery stream. Complete the following steps to create the rule and attach targets:

  1. On the EventBridge console, create a new rule.
  2. For Name, enter a name (for this post, ProcessDevicePosition).
  3. For Event bus¸ choose default.
  4. For Rule type¸ select Rule with an event pattern.
    EventBridge rule detail
  5. For Event source, select AWS events or EventBridge partner events.
    EventBridge event source
  6. For Method, select Use pattern form.
  7. In the Event pattern section, specify AWS services as the source, Amazon Location Service as the specific service, and Location Device Position Event as the event type.
    EventBridge creation method
  8. For Target 1, attach the ProcessDevicePosition Lambda function as a target.
    EventBridge target 1
  9. We use Input transformer to customize the event that is committed to the S3 bucket.
    EventBridge target 1 transformer
  10. Configure Input paths map and Input template to organize the payload into the desired format.
    1. The following code is the input paths map:
      {
          EventType: $.detail.EventType
          TrackerName: $.detail.TrackerName
          DeviceId: $.detail.DeviceId
          SampleTime: $.detail.SampleTime
          ReceivedTime: $.detail.ReceivedTime
          Longitude: $.detail.Position[0]
          Latitude: $.detail.Position[1]
      }

    2. The following code is the input template:
      {
          "EventType":<EventType>,
          "TrackerName":<TrackerName>,
          "DeviceId":<DeviceId>,
          "SampleTime":<SampleTime>,
          "ReceivedTime":<ReceivedTime>,
          "Position":[<Longitude>, <Latitude>]
      }

  11. For Target 2, choose the ProcessDevicePositionFirehose delivery stream as a target.
    EventBridge target 2

This target requires an IAM role that allows one or multiple records to be written to the Firehose delivery stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecords"
            ],
            "Resource": [
                "arn:aws:firehose:<region>:<account-id>:deliverystream/<delivery-stream-name>"
            ],
            "Effect": "Allow"
        }
    ]
}

Crawl and catalog the data using AWS Glue

After sufficient data has been generated, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the crawlers that have been created, location-analytics-glue-crawler-lambda and location-analytics-glue-crawler-firehose.
  3. Choose Run.

The crawlers will automatically classify the data into JSON format, group the records into tables and partitions, and commit associated metadata to the AWS Glue Data Catalog.
Crawlers

  1. When the Last run statuses of both crawlers show as Succeeded, confirm that two tables (lambda and firehose) have been created on the Tables page.

The solution partitions the incoming location data based on the deviceid field. Therefore, as long as there are no new devices or schema changes, the crawlers don’t need to run again. However, if new devices are added, or a different field is used for partitioning, the crawlers need to run again.
Tables

You’re now ready to query the tables using Athena.

Query the data using Athena

Athena is a serverless, interactive analytics service built to analyze unstructured, semi-structured, and structured data where it is hosted. If this is your first time using the Athena console, follow the instructions to set up a query result location in Amazon S3. To query the data with Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. For Data source, choose AwsDataCatalog.
  3. For Database, choose location-analytics-glue-database.
  4. On the options menu (three vertical dots), choose Preview Table to query the content of both tables.
    Preview table

The query displays 10 sample positional records currently stored in the table. The following screenshot is an example from previewing the firehose table. The firehose table stores raw, unmodified data from the Amazon Location tracker.
Query results
You can now experiment with geospatial queries.The GeoJSON file for the 2021 London ULEZ expansion is part of the repository, and has already been converted into a query compatible with both Athena tables.

  1. Copy and paste the content from the 1-firehose-athena-ulez-2021-create-view.sql file found in the examples/firehose folder into the query editor.

This query uses the ST_Within geospatial function to determine if a recorded position is inside or outside the ULEZ zone defined by the polygon. A new view called ulezvehicleanalysis_firehose is created with a new column, insidezone, which captures whether the recorded position exists within the zone.

A simple Python utility is provided, which converts the polygon features found in the downloaded GeoJSON file into ST_Polygon strings based on the well-known text format that can be used directly in an Athena query.

  1. Choose Preview View on the ulezvehicleanalysis_firehose view to explore its content.
    Preview view

You can now run queries against this view to gain overarching insights.

  1. Copy and paste the content from the 2-firehose-athena-ulez-2021-query-days-in-zone.sql file found in the examples/firehose folder into the query editor.

This query establishes the total number of days each vehicle has entered ULEZ, and what the expected total charges would be. The query has been parameterized using the ? placeholder character. Parameterized queries allow you to rerun the same query with different parameter values.

  1. Enter the daily fee amount for Parameter 1, then run the query.
    Query editor

The results display each vehicle, the total number of days spent in the proposed ULEZ, and the total charges based on the daily fee you entered.
Query results
You can repeat this exercise using the lambda table. Data in the lambda table is augmented with additional vehicle details present in the vehicle maintenance DynamoDB table at the time it is processed by the Lambda function. The solution supports the following fields:

  • MeetsEmissionStandards (Boolean)
  • Mileage (Number)
  • PurchaseDate (String, in YYYY-MM-DD format)

You can also enrich the new data as it arrives.

  1. On the DynamoDB console, find the vehicle maintenance table under Tables. The table name is provided as output VehicleMaintenanceDynamoTable in the deployed CloudFormation stack.
  2. Choose Explore table items to view the content of the table.
  3. Choose Create item to create a new record for a vehicle.
    Create item
  4. Enter DeviceId (such as vehicle1 as a String), PurchaseDate (such as 2005-10-01 as a String), Mileage (such as 10000 as a Number), and MeetsEmissionStandards (with a value such as False as Boolean).
  5. Choose Create item to create the record.
    Create item
  6. Duplicate the newly created record with additional entries for other vehicles (such as for vehicle2 or vehicle3), modifying the values of the attributes slightly each time.
  7. Rerun the location-analytics-glue-crawler-lambda AWS Glue crawler after new data has been generated to confirm that the update to the schema with new fields is registered.
  8. Copy and paste the content from the 1-lambda-athena-ulez-2021-create-view.sql file found in the examples/lambda folder into the query editor.
  9. Preview the ulezvehicleanalysis_lambda view to confirm that the new columns have been created.

If errors such as Column 'mileage' cannot be resolved are displayed, the data enrichment is not taking place, or the AWS Glue crawler has not yet detected updates to the schema.

If the Preview table option is only returning results from before you created records in the DynamoDB table, return the query results in descending order using sampletime (for example, order by sampletime desc limit 100;).
Query results
Now we focus on the vehicles that don’t currently meet emissions standards, and order the vehicles in descending order based on the mileage per year (calculated using the latest mileage / age of vehicle in years).

  1. Copy and paste the content from the 2-lambda-athena-ulez-2021-query-days-in-zone.sql file found in the examples/lambda folder into the query editor.
    Query results

In this example, we can see that out of our fleet of vehicles, five have been reported as not meeting emission standards. We can also see the vehicles that have accumulated high mileage per year, and the number of days spent in the proposed ULEZ. The fleet operator may now decide to prioritize these vehicles for replacement. Because location data is enriched with the most up-to-date vehicle maintenance data at the time it is ingested, you can further evolve these queries to run over a defined time window. For example, you could factor in mileage changes within the past year.

Due to the dynamic nature of the data enrichment, any new data being committed to Amazon S3, along with the query results, will be altered as and when records are updated in the DynamoDB vehicle maintenance table.

Clean up

Refer to the instructions in the README file to clean up the resources provisioned for this solution.

Conclusion

This post demonstrated how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use the collected device position data to drive analytical insights using AWS Glue and Athena. By tracking these assets in real time and storing the results, companies can derive valuable insights on how effectively their fleets are being utilized and better react to changes in the future. You can now explore extending this sample code with your own device tracking data and analytics requirements.


About the Authors

Alan Peaty is a Senior Partner Solutions Architect at AWS. Alan helps Global Systems Integrators (GSIs) and Global Independent Software Vendors (GISVs) solve complex customer challenges using AWS services. Prior to joining AWS, Alan worked as an architect at systems integrators to translate business requirements into technical solutions. Outside of work, Alan is an IoT enthusiast and a keen runner who loves to hit the muddy trails of the English countryside.

Parag Srivastava is a Solutions Architect at AWS, helping enterprise customers with successful cloud adoption and migration. During his professional career, he has been extensively involved in complex digital transformation projects. He is also passionate about building innovative solutions around geospatial aspects of addresses.

Measure performance of AWS Glue Data Quality for ETL pipelines

Post Syndicated from Ruben Afonso original https://aws.amazon.com/blogs/big-data/measure-performance-of-aws-glue-data-quality-for-etl-pipelines/

In recent years, data lakes have become a mainstream architecture, and data quality validation is a critical factor to improve the reusability and consistency of the data. AWS Glue Data Quality reduces the effort required to validate data from days to hours, and provides computing recommendations, statistics, and insights about the resources required to run data validation.

AWS Glue Data Quality is built on DeeQu, an open source tool developed and used at Amazon to calculate data quality metrics and verify data quality constraints and changes in the data distribution so you can focus on describing how data should look instead of implementing algorithms.

In this post, we provide benchmark results of running increasingly complex data quality rulesets over a predefined test dataset. As part of the results, we show how AWS Glue Data Quality provides information about the runtime of extract, transform, and load (ETL) jobs, the resources measured in terms of data processing units (DPUs), and how you can track the cost of running AWS Glue Data Quality for ETL pipelines by defining custom cost reporting in AWS Cost Explorer.

This post is Part 6 of a six-part series of posts to explain how AWS Glue Data Quality works.

Check out the other posts in the series:

Solution overview

We start by defining our test dataset in order to explore how AWS Glue Data Quality automatically scales depending on input datasets.

Dataset details

The test dataset contains 104 columns and 1 million rows stored in Parquet format. You can download the dataset or recreate it locally using the Python script provided in the repository. If you opt to run the generator script, you need to install the Pandas and Mimesis packages in your Python environment:

pip install pandas mimesis

The dataset schema is a combination of numerical, categorical, and string variables in order to have enough attributes to use a combination of built-in AWS Glue Data Quality rule types. The schema replicates some of the most common attributes found in financial market data such as instrument ticker, traded volumes, and pricing forecasts.

Data quality rulesets

We categorize some of the built-in AWS Glue Data Quality rule types to define the benchmark structure. The categories consider whether the rules perform column checks that don’t require row-level inspection (simple rules), row-by-row analysis (medium rules), or data type checks, eventually comparing row values against other data sources (complex rules). The following table summarizes these rules.

Simple Rules Medium Rules Complex Rules
ColumnCount DistinctValuesCount ColumnValues
ColumnDataType IsComplete Completeness
ColumnExist Sum ReferentialIntegrity
ColumnNamesMatchPattern StandardDeviation ColumnCorrelation
RowCount Mean RowCountMatch
ColumnLength . .

We define eight different AWS Glue ETL jobs where we run the data quality rulesets. Each job has a different number of data quality rules associated to it. Each job also has an associated user-defined cost allocation tag that we use to create a data quality cost report in AWS Cost Explorer later on.

We provide the plain text definition for each ruleset in the following table.

Job name Simple Rules Medium Rules Complex Rules Number of Rules Tag Definition
ruleset-0 0 0 0 0 dqjob:rs0
ruleset-1 0 0 1 1 dqjob:rs1 Link
ruleset-5 3 1 1 5 dqjob:rs5 Link
ruleset-10 6 2 2 10 dqjob:rs10 Link
ruleset-50 30 10 10 50 dqjob:rs50 Link
ruleset-100 50 30 20 100 dqjob:rs100 Link
ruleset-200 100 60 40 200 dqjob:rs200 Link
ruleset-400 200 120 80 400 dqjob:rs400 Link

Create the AWS Glue ETL jobs containing the data quality rulesets

We upload the test dataset to Amazon Simple Storage Service (Amazon S3) and also two additional CSV files that we’ll use to evaluate referential integrity rules in AWS Glue Data Quality (isocodes.csv and exchanges.csv) after they have been added to the AWS Glue Data Catalog. Complete the following steps:

  1. On the Amazon S3 console, create a new S3 bucket in your account and upload the test dataset.
  2. Create a folder in the S3 bucket called isocodes and upload the isocodes.csv file.
  3. Create another folder in the S3 bucket called exchange and upload the exchanges.csv file.
  4. On the AWS Glue console, run two AWS Glue crawlers, one for each folder to register the CSV content in AWS Glue Data Catalog (data_quality_catalog). For instructions, refer to Adding an AWS Glue Crawler.

The AWS Glue crawlers generate two tables (exchanges and isocodes) as part of the AWS Glue Data Catalog.

AWS Glue Data Catalog

Now we will create the AWS Identity and Access Management (IAM) role that will be assumed by the ETL jobs at runtime:

  1. On the IAM console, create a new IAM role called AWSGlueDataQualityPerformanceRole
  2. For Trusted entity type, select AWS service.
  3. For Service or use case, choose Glue.
  4. Choose Next.

AWS IAM trust entity selection

  1. For Permission policies, enter AWSGlueServiceRole
  2. Choose Next.
    AWS IAM add permissions policies
  3. Create and attach a new inline policy (AWSGlueDataQualityBucketPolicy) with the following content. Replace the placeholder with the S3 bucket name you created earlier:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": [
            "arn:aws:s3:::<your_Amazon_S3_bucket_name>/*"
          ]
        }
      ]
    }

Next, we create one of the AWS Glue ETL jobs, ruleset-5.

  1. On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.
  2. In the Create job section, choose Visual ETL.x
    Overview of available jobs in AWS Glue Studio
  3. In the Visual Editor, add a Data Source – S3 Bucket source node:
    1. For S3 URL, enter the S3 folder containing the test dataset.
    2. For Data format, choose Parquet.

    Overview of Amazon S3 data source in AWS Glue Studio

  4. Create a new action node, Transform: Evaluate-Data-Catalog:
  5. For Node parents, choose the node you created.
  6. Add the ruleset-5 definition under Ruleset editor.
    Data quality rules for ruleset-5
  7. Scroll to the end and under Performance Configuration, enable Cache Data.

Enable Cache data option

  1. Under Job details, for IAM Role, choose AWSGlueDataQualityPerformanceRole.
    Select previously created AWS IAM role
  2. In the Tags section, define dqjob tag as rs5.

This tag will be different for each of the data quality ETL jobs; we use them in AWS Cost Explorer to review the ETL jobs cost.

Define dqjob tag for ruleset-5 job

  1. Choose Save.
  2. Repeat these steps with the rest of the rulesets to define all the ETL jobs.

Overview of jobs defined in AWS Glue Studio

Run the AWS Glue ETL jobs

Complete the following steps to run the ETL jobs:

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select the ETL job and choose Run job.
  3. Repeat for all the ETL jobs.

Select one AWS Glue job and choose Run Job on the top right

When the ETL jobs are complete, the Job run monitoring page will display the job details. As shown in the following screenshot, a DPU hours column is provided for each ETL job.

Overview of AWS Glue jobs monitoring

Review performance

The following table summarizes the duration, DPU hours, and estimated costs from running the eight different data quality rulesets over the same test dataset. Note that all rulesets have been run with the entire test dataset described earlier (104 columns, 1 million rows).

ETL Job Name Number of Rules Tag Duration (sec) # of DPU hours # of DPUs Cost ($)
ruleset-400 400 dqjob:rs400 445.7 1.24 10 $0.54
ruleset-200 200 dqjob:rs200 235.7 0.65 10 $0.29
ruleset-100 100 dqjob:rs100 186.5 0.52 10 $0.23
ruleset-50 50 dqjob:rs50 155.2 0.43 10 $0.19
ruleset-10 10 dqjob:rs10 152.2 0.42 10 $0.18
ruleset-5 5 dqjob:rs5 150.3 0.42 10 $0.18
ruleset-1 1 dqjob:rs1 150.1 0.42 10 $0.18
ruleset-0 0 dqjob:rs0 53.2 0.15 10 $0.06

The cost of evaluating an empty ruleset is close to zero, but it has been included because it can be used as a quick test to validate the IAM roles associated to the AWS Glue Data Quality jobs and read permissions to the test dataset in Amazon S3. The cost of data quality jobs only starts to increase after evaluating rulesets with more than 100 rules, remaining constant below that number.

We can observe that the cost of running data quality for the largest ruleset in the benchmark (400 rules) is still slightly above $0.50.

Data quality cost analysis in AWS Cost Explorer

In order to see the data quality ETL job tags in AWS Cost Explorer, you need to activate the user-defined cost allocation tags first.

After you create and apply user-defined tags to your resources, it can take up to 24 hours for the tag keys to appear on your cost allocation tags page for activation. It can then take up to 24 hours for the tag keys to activate.

  1. On the AWS Cost Explorer console, choose Cost Explorer Saved Reports in the navigation pane.
  2. Choose Create new report.
    Create new AWS Cost Explorer report
  3. Select Cost and usage as the report type.
  4. Choose Create Report.
    Confirm creation of a new AWS Cost Explorer report
  5. For Date Range, enter a date range.
  6. For Granularity¸ choose Daily.
  7. For Dimension, choose Tag, then choose the dqjob tag.
    Report parameter selection in AWS Cost Explorer
  8. Under Applied filters, choose the dqjob tag and the eight tags used in the data quality rulesets (rs0, rs1, rs5, rs10, rs50, rs100, rs200, and rs400).
    Select the eight tags used to tag the data quality AWS Glue jobs
  9. Choose Apply.

The Cost and Usage report will be updated. The X-axis shows the data quality ruleset tags as categories. The Cost and usage graph in AWS Cost Explorer will refresh and show the total monthly cost of the latest data quality ETL jobs run, aggregated by ETL job.

The AWS Cost Explorer report shows the costs associated to executing the data quality AWS Glue Studio jobs

Clean up

To clean up the infrastructure and avoid additional charges, complete the following steps:

  1. Empty the S3 bucket initially created to store the test dataset.
  2. Delete the ETL jobs you created in AWS Glue.
  3. Delete the AWSGlueDataQualityPerformanceRole IAM role.
  4. Delete the custom report created in AWS Cost Explorer.

Conclusion

AWS Glue Data Quality provides an efficient way to incorporate data quality validation as part of ETL pipelines and scales automatically to accommodate increasing volumes of data. The built-in data quality rule types offer a wide range of options to customize the data quality checks and focus on how your data should look instead of implementing undifferentiated logic.

In this benchmark analysis, we showed how common-size AWS Glue Data Quality rulesets have little or no overhead, whereas in complex cases, the cost increases linearly. We also reviewed how you can tag AWS Glue Data Quality jobs to make cost information available in AWS Cost Explorer for quick reporting.

AWS Glue Data Quality is generally available in all AWS Regions where AWS Glue is available. Learn more about AWS Glue Data Quality and AWS Glue Data Catalog in Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog.


About the Authors


Ruben Afonso Francos
Ruben Afonso is a Global Financial Services Solutions Architect with AWS. He enjoys working on analytics and AI/ML challenges, with a passion for automation and optimization. When not at work, he enjoys finding hidden spots off the beaten path around Barcelona.


Kalyan Kumar Neelampudi (KK)
Kalyan Kumar Neelampudi (KK)
is a Specialist Partner Solutions Architect (Data Analytics & Generative AI) at AWS. He acts as a technical advisor and collaborates with various AWS partners to design, implement, and build practices around data analytics and AI/ML workloads. Outside of work, he’s a badminton enthusiast and culinary adventurer, exploring local cuisines and traveling with his partner to discover new tastes and experiences.

Gonzalo Herreros
Gonzalo Herreros
is a Senior Big Data Architect on the AWS Glue team.

Use AWS Glue ETL to perform merge, partition evolution, and schema evolution on Apache Iceberg

Post Syndicated from Satyanarayana Adimula original https://aws.amazon.com/blogs/big-data/use-aws-glue-etl-to-perform-merge-partition-evolution-and-schema-evolution-on-apache-iceberg/

As enterprises collect increasing amounts of data from various sources, the structure and organization of that data often need to change over time to meet evolving analytical needs. However, altering schema and table partitions in traditional data lakes can be a disruptive and time-consuming task, requiring renaming or recreating entire tables and reprocessing large datasets. This hampers agility and time to insight.

Schema evolution enables adding, deleting, renaming, or modifying columns without needing to rewrite existing data. This is critical for fast-moving enterprises to augment data structures to support new use cases. For example, an ecommerce company may add new customer demographic attributes or order status flags to enrich analytics. Apache Iceberg manages these schema changes in a backward-compatible way through its innovative metadata table evolution architecture.

Similarly, partition evolution allows seamless adding, dropping, or splitting partitions. For instance, an ecommerce marketplace may initially partition order data by day. As orders accumulate, and querying by day becomes inefficient, they may split to day and customer ID partitions. Table partitioning organizes big datasets most efficiently for query performance. Iceberg gives enterprises the flexibility to incrementally adjust partitions rather than requiring tedious rebuild procedures. New partitions can be added in a fully compatible way without downtime or having to rewrite existing data files.

This post demonstrates how you can harness Iceberg, Amazon Simple Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identity and Access Management (IAM) to implement a transactional data lake supporting seamless evolution. By allowing for painless schema and partition adjustments as data insights evolve, you can benefit from the future-proof flexibility needed for business success.

Overview of solution

For our example use case, a fictional large ecommerce company processes thousands of orders each day. When orders are received, updated, cancelled, shipped, delivered, or returned, the changes are made in their on-premises system, and those changes need to be replicated to an S3 data lake so that data analysts can run queries through Amazon Athena. The changes can contain schema updates as well. Due to the security requirements of different organizations, they need to manage fine-grained access control for the analysts through Lake Formation.

The following diagram illustrates the solution architecture.

The solution workflow includes the following key steps:

  1. Ingest data from on premises into a Dropzone location using a data ingestion pipeline.
  2. Merge the data from the Dropzone location into Iceberg using AWS Glue.
  3. Query the data using Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, complete the following steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Choose Launch Stack:
  4. For Stack name, enter a name (for this post, icebergdemo1).
  5. Choose Next.
  6. Provide information for the following parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Choose Next.
  8. Choose Next again.
  9. In the Review section, review the values you entered.
  10. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

In a few minutes, the stack status will change to CREATE_COMPLETE.

You can go to the Outputs tab of the stack to see all the resources it has provisioned. The resources are prefixed with the stack name you provided (for this post, icebergdemo1).

Create an Iceberg table using Lambda and grant access using Lake Formation

To create an Iceberg table and grant access on it, complete the following steps:

  1. Navigate to the Resources tab of the CloudFormation stack icebergdemo1 and search for logical ID named LambdaFunctionIceberg.
  2. Choose the hyperlink of the associated physical ID.

You’re redirected to the Lambda function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, choose Environment variables in the left pane.
  1. On the Code tab, you can inspect the function code.

The function uses the AWS SDK for Python (Boto3) APIs to provision the resources. It assumes the provisioned data lake admin role to perform the following tasks:

  • Grant DATA_LOCATION_ACCESS access to the data lake admin role on the registered data lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database in the AWS Glue Data Catalog using the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE access on the database using LF-Tags to the data lake IAM user and AWS Glue ETL IAM role
  • Create an Iceberg table using the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName= 'icebergdb1',
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Version': '2'
 }
},
TableInput={
    'Name': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)
  • Assign LF-Tags to the table
  • Grant DESCRIBE and SELECT on the Iceberg table LF-Tags for the data lake IAM user
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER access on the Iceberg table LF-Tags to the AWS Glue ETL IAM role
  1. On the Test tab, choose Test to run the function.

When the function is complete, you will see the message “Executing function: succeeded.”

Lake Formation helps you centrally manage, secure, and globally share data for analytics and machine learning. With Lake Formation, you can manage fine-grained access control for your data lake data on Amazon S3 and its metadata in the Data Catalog.

To add an Amazon S3 location as Iceberg storage in your data lake, register the location with Lake Formation. You can then use Lake Formation permissions for fine-grained access control to the Data Catalog objects that point to this location, and to the underlying data in the location.

The CloudFormation stack registered the data lake location.

Data location permissions in Lake Formation enable principals to create and alter Data Catalog resources that point to the designated registered Amazon S3 locations. Data location permissions work in addition to Lake Formation data permissions to secure information in your data lake.

Lake Formation tag-based access control (LF-TBAC) is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-Tags. You can attach LF-Tags to Data Catalog resources, Lake Formation principals, and table columns. You can assign and revoke permissions on Lake Formation resources using these LF-Tags. Lake Formation allows operations on those resources when the principal’s tag matches the resource tag.

Verify the Iceberg table from the Lake Formation console

To verify the Iceberg table, complete the following steps:

  1. On the Lake Formation console, choose Databases in the navigation pane.
  2. Open the details page for icebergdb1.

You can see the associated database LF-Tags.

  1. Choose Tables in the navigation pane.
  2. Open the details page for ecomorders.

In the Table details section, you can observe the following:

  • Table format shows as Apache Iceberg
  • Table management shows as Managed by Data Catalog
  • Location lists the data lake location of the Iceberg table

In the LF-Tags section, you can see the associated table LF-Tags.

In the Table details section, expand Advanced table properties to view the following:

  • metadata_location points to the location of the Iceberg table’s metadata file
  • table_type shows as ICEBERG

On the Schema tab, you can view the columns defined on the Iceberg table.

Integrate Iceberg with the AWS Glue Data Catalog and Amazon S3

Iceberg tracks individual data files in a table instead of directories. When there is an explicit commit on the table, Iceberg creates data files and adds them to the table. Iceberg maintains the table state in metadata files. Any change in table state creates a new metadata file that atomically replaces the older metadata. Metadata files track the table schema, partitioning configuration, and other properties.

Iceberg requires file systems that support the operations to be compatible with object stores like Amazon S3.

Iceberg creates snapshots for the table contents. Each snapshot is a complete set of data files in the table at a point in time. Data files in snapshots are stored in one or more manifest files that contain a row for each data file in the table, its partition data, and its metrics.

The following diagram illustrates this hierarchy.

When you create an Iceberg table, it creates the metadata folder first and a metadata file in the metadata folder. The data folder is created when you load data into the Iceberg table.

Contents of the Iceberg metadata file

The Iceberg metadata file contains a lot of information, including the following:

  • format-version –Version of the Iceberg table
  • Location – Amazon S3 location of the table
  • Schemas – Name and data type of all columns on the table
  • partition-specs – Partitioned columns
  • sort-orders – Sort order of columns
  • properties – Table properties
  • current-snapshot-id – Current snapshot
  • refs – Table references
  • snapshots – List of snapshots, each containing the following information:
    • sequence-number – Sequence number of snapshots in chronological order (the highest number represents the current snapshot, 1 for the first snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was committed
    • summary – Summary of changes committed
    • manifest-list – List of manifests; this file name starts with snap-< snapshot-id >
  • schema-id – Sequence number of the schema in chronological order (the highest number represents the current schema)
  • snapshot-log – List of snapshots in chronological order
  • metadata-log – List of metadata files in chronological order

The metadata file has all the historical changes to the table’s data and schema. Reviewing the contents on the metafile file directly can be a time-consuming task. Fortunately, you can query the Iceberg metadata using Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 supports Iceberg tables registered with Lake Formation. In the AWS Glue ETL jobs, you need the following code to enable the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Set up configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For read/write access to underlying data, in addition to Lake Formation permissions, the AWS Glue IAM role to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for temporary credentials to access the data.

The CloudFormation stack provisioned the four AWS Glue ETL jobs for you. The name of each job starts with your stack name (icebergdemo1). Complete the following steps to view the jobs:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Search for jobs with icebergdemo1 in the name.

Merge data from Dropzone into the Iceberg table

For our use case, the company ingests their ecommerce orders data daily from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three files with sample orders for 3 days, as shown in the following figures. You see the data in the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/data.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run daily to merge the data into the Iceberg table. It has the following logic to add or update the data on Iceberg:

  • Create a Spark DataFrame from input data:
df = spark.read.format(dropzone_dataformat).option("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].cast(IntegerType())) \
    .withColumn("quantity", df["quantity"].cast(IntegerType()))
df.createOrReplaceTempView("input_table")
  • For a new order, add it to the table
  • If the table has a matching order, update the status and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.status = s.status,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Complete the following steps to run the AWS Glue merge job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, choose Run with parameters.
  4. On the Run parameters page, go to Job parameters.
  5. For the --dropzone_path parameter, provide the S3 location of the input data (icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge1).
  6. Run the job to add all the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge2.
  8. Run the job again to add orders 2001 and 2002, and update orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge3.
  10. Run the job again to add order 3001 and update orders 1001, 1003, 2001, and 2002.

Go to the data folder of table to see the data files written by Iceberg when you merged the data into the table using the Glue ETL job icebergdemo1-GlueETL1-merge.

Query Iceberg using Athena

The CloudFormation stack created the IAM user iceberguser1, which has read access on the Iceberg table using LF-Tags. To query Iceberg using Athena via this user, complete the following steps:

  1. Log in as iceberguser1 to the AWS Management Console.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Locate the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Verify Athena engine version 3.

The Athena engine version 3 supports Iceberg file formats, including Parquet, ORC, and Avro.

  1. Go to the Athena query editor.
  2. Choose the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, choose icebergdb1. You will see the table ecomorders.
  4. Run the following query to see the data in the Iceberg table:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the following query to see table’s current partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how table is partitioned. In this example, there are no partitioned fields because you didn’t define any partitions on the table.

Iceberg partition evolution

You may need to change your partition structure; for example, due to trend changes of common query patterns in downstream analytics. A change of partition structure for traditional tables is a significant operation that requires an entire data copy.

Iceberg makes this straightforward. When you change the partition structure on Iceberg, it doesn’t require you to rewrite the data files. The old data written with earlier partitions remains unchanged. New data is written using the new specifications in a new layout. Metadata for each of the partition versions is kept separately.

Let’s add the partition field category to the Iceberg table using the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD category ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is complete, you can query partitions using Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You can see the partition field category, but the partition values are null. There are no new data files in the data folder, because partition evolution is a metadata operation and doesn’t rewrite data files. When you add or update data, you will see the corresponding partition values populated.

Iceberg schema evolution

Iceberg supports in-place table evolution. You can evolve a table schema just like SQL. Iceberg schema updates are metadata changes, so no data files need to be rewritten to perform the schema evolution.

To explore the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution via the AWS Glue console. The job runs the following SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

In the Athena query editor, run the following query:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You can verify the schema changes to the Iceberg table:

  • A new column has been added called shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The data type of the column ordernum has changed from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional update

The data in tracking_number contains the shipping carrier concatenated with the tracking number. Let’s assume that we want to split this data in order to keep the shipping carrier in the shipping_carrier field and the tracking number in the tracking_number field.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the following SparkSQL statement to update the table:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Query the Iceberg table to verify the updated data on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the data has been updated on the table, you should see the partition values populated for category:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clean up

To avoid incurring future charges, clean up the resources you created:

  1. On the Lambda console, open the details page for the function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. In the Environment variables section, choose the key Task_To_Perform and update the value to CLEANUP.
  3. Run the function, which drops the database, table, and their associated LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

In this post, you created an Iceberg table using the AWS Glue API and used Lake Formation to control access on the Iceberg table in a transactional data lake. With AWS Glue ETL jobs, you merged data into the Iceberg table, and performed schema evolution and partition evolution without rewriting or recreating the Iceberg table. With Athena, you queried the Iceberg data and metadata.

Based on the concepts and demonstrations from this post, you can now build a transactional data lake in an enterprise using Iceberg, AWS Glue, Lake Formation, and Amazon S3.


About the Author

Satya Adimula is a Senior Data Architect at AWS based in Boston. With over two decades of experience in data and analytics, Satya helps organizations derive business insights from their data at scale.

Top Architecture Blog Posts of 2023

Post Syndicated from Andrea Courtright original https://aws.amazon.com/blogs/architecture/top-architecture-blog-posts-of-2023/

2023 was a rollercoaster year in tech, and we at the AWS Architecture Blog feel so fortunate to have shared in the excitement. As we move into 2024 and all of the new technologies we could see, we want to take a moment to highlight the brightest stars from 2023.

As always, thanks to our readers and to the many talented and hardworking Solutions Architects and other contributors to our blog.

I give you our 2023 cream of the crop!

#10: Build a serverless retail solution for endless aisle on AWS

In this post, Sandeep and Shashank help retailers and their customers alike in this guided approach to finding inventory that doesn’t live on shelves.

Building endless aisle architecture for order processing

Figure 1. Building endless aisle architecture for order processing

Check it out!

#9: Optimizing data with automated intelligent document processing solutions

Who else dreads wading through large amounts of data in multiple formats? Just me? I didn’t think so. Using Amazon AI/ML and content-reading services, Deependra, Anirudha, Bhajandeep, and Senaka have created a solution that is scalable and cost-effective to help you extract the data you need and store it in a format that works for you.

AI-based intelligent document processing engine

Figure 2: AI-based intelligent document processing engine

Check it out!

#8: Disaster Recovery Solutions with AWS managed services, Part 3: Multi-Site Active/Passive

Disaster recovery posts are always popular, and this post by Brent and Dhruv is no exception. Their creative approach in part 3 of this series is most helpful for customers who have business-critical workloads with higher availability requirements.

Warm standby with managed services

Figure 3. Warm standby with managed services

Check it out!

#7: Simulating Kubernetes-workload AZ failures with AWS Fault Injection Simulator

Continuing with the theme of “when bad things happen,” we have Siva, Elamaran, and Re’s post about preparing for workload failures. If resiliency is a concern (and it really should be), the secret is test, test, TEST.

Architecture flow for Microservices to simulate a realistic failure scenario

Figure 4. Architecture flow for Microservices to simulate a realistic failure scenario

Check it out!

#6: Let’s Architect! Designing event-driven architectures

Luca, Laura, Vittorio, and Zamira weren’t content with their four top-10 spots last year – they’re back with some things you definitely need to know about event-driven architectures.

Let's Architect

Figure 5. Let’s Architect artwork

Check it out!

#5: Use a reusable ETL framework in your AWS lake house architecture

As your lake house increases in size and complexity, you could find yourself facing maintenance challenges, and Ashutosh and Prantik have a solution: frameworks! The reusable ETL template with AWS Glue templates might just save you a headache or three.

Reusable ETL framework architecture

Figure 6. Reusable ETL framework architecture

Check it out!

#4: Invoking asynchronous external APIs with AWS Step Functions

It’s possible that AWS’ menagerie of services doesn’t have everything you need to run your organization. (Possible, but not likely; we have a lot of amazing services.) If you are using third-party APIs, then Jorge, Hossam, and Shirisha’s architecture can help you maintain a secure, reliable, and cost-effective relationship among all involved.

Invoking Asynchronous External APIs architecture

Figure 7. Invoking Asynchronous External APIs architecture

Check it out!

#3: Announcing updates to the AWS Well-Architected Framework

The Well-Architected Framework continues to help AWS customers evaluate their architectures against its six pillars. They are constantly striving for improvement, and Haleh’s diligence in keeping us up to date has not gone unnoticed. Thank you, Haleh!

Well-Architected logo

Figure 8. Well-Architected logo

Check it out!

#2: Let’s Architect! Designing architectures for multi-tenancy

The practically award-winning Let’s Architect! series strikes again! This time, Luca, Laura, Vittorio, and Zamira were joined by Federica to discuss multi-tenancy and why that concept is so crucial for SaaS providers.

Let's Architect

Figure 9. Let’s Architect

Check it out!

And finally…

#1: Understand resiliency patterns and trade-offs to architect efficiently in the cloud

Haresh, Lewis, and Bonnie revamped this 2022 post into a masterpiece that completely stole our readers’ hearts and is among the top posts we’ve ever made!

Resilience patterns and trade-offs

Figure 10. Resilience patterns and trade-offs

Check it out!

Bonus! Three older special mentions

These three posts were published before 2023, but we think they deserve another round of applause because you, our readers, keep coming back to them.

Thanks again to everyone for their contributions during a wild year. We hope you’re looking forward to the rest of 2024 as much as we are!

Enhance container software supply chain visibility through SBOM export with Amazon Inspector and QuickSight

Post Syndicated from Jason Ng original https://aws.amazon.com/blogs/security/enhance-container-software-supply-chain-visibility-through-sbom-export-with-amazon-inspector-and-quicksight/

In this post, I’ll show how you can export software bills of materials (SBOMs) for your containers by using an AWS native service, Amazon Inspector, and visualize the SBOMs through Amazon QuickSight, providing a single-pane-of-glass view of your organization’s software supply chain.

The concept of a bill of materials (BOM) originated in the manufacturing industry in the early 1960s. It was used to keep track of the quantities of each material used to manufacture a completed product. If parts were found to be defective, engineers could then use the BOM to identify products that contained those parts. An SBOM extends this concept to software development, allowing engineers to keep track of vulnerable software packages and quickly remediate the vulnerabilities.

Today, most software includes open source components. A Synopsys study, Walking the Line: GitOps and Shift Left Security, shows that 8 in 10 organizations reported using open source software in their applications. Consider a scenario in which you specify an open source base image in your Dockerfile but don’t know what packages it contains. Although this practice can significantly improve developer productivity and efficiency, the decreased visibility makes it more difficult for your organization to manage risk effectively.

It’s important to track the software components and their versions that you use in your applications, because a single affected component used across multiple organizations could result in a major security impact. According to a Gartner report titled Gartner Report for SBOMs: Key Takeaways You Should know, by 2025, 60 percent of organizations building or procuring critical infrastructure software will mandate and standardize SBOMs in their software engineering practice, up from less than 20 percent in 2022. This will help provide much-needed visibility into software supply chain security.

Integrating SBOM workflows into the software development life cycle is just the first step—visualizing SBOMs and being able to search through them quickly is the next step. This post describes how to process the generated SBOMs and visualize them with Amazon QuickSight. AWS also recently added SBOM export capability in Amazon Inspector, which offers the ability to export SBOMs for Amazon Inspector monitored resources, including container images.

Why is vulnerability scanning not enough?

Scanning and monitoring vulnerable components that pose cybersecurity risks is known as vulnerability scanning, and is fundamental to organizations for ensuring a strong and solid security posture. Scanners usually rely on a database of known vulnerabilities, the most common being the Common Vulnerabilities and Exposures (CVE) database.

Identifying vulnerable components with a scanner can prevent an engineer from deploying affected applications into production. You can embed scanning into your continuous integration and continuous delivery (CI/CD) pipelines so that images with known vulnerabilities don’t get pushed into your image repository. However, what if a new vulnerability is discovered but has not been added to the CVE records yet? A good example of this is the Apache Log4j vulnerability, which was first disclosed on Nov 24, 2021 and only added as a CVE on Dec 1, 2021. This means that for 7 days, scanners that relied on the CVE system weren’t able to identify affected components within their organizations. This issue is known as a zero-day vulnerability. Being able to quickly identify vulnerable software components in your applications in such situations would allow you to assess the risk and come up with a mitigation plan without waiting for a vendor or supplier to provide a patch.

In addition, it’s also good hygiene for your organization to track usage of software packages, which provides visibility into your software supply chain. This can improve collaboration between developers, operations, and security teams, because they’ll have a common view of every software component and can collaborate effectively to address security threats.

In this post, I present a solution that uses the new Amazon Inspector feature to export SBOMs from container images, process them, and visualize the data in QuickSight. This gives you the ability to search through your software inventory on a dashboard and to use natural language queries through QuickSight Q, in order to look for vulnerabilities.

Solution overview

Figure 1 shows the architecture of the solution. It is fully serverless, meaning there is no underlying infrastructure you need to manage. This post uses a newly released feature within Amazon Inspector that provides the ability to export a consolidated SBOM for Amazon Inspector monitored resources across your organization in commonly used formats, including CycloneDx and SPDX.

Figure 1: Solution architecture diagram

Figure 1: Solution architecture diagram

The workflow in Figure 1 is as follows:

  1. The image is pushed into Amazon Elastic Container Registry (Amazon ECR), which sends an Amazon EventBridge event.
  2. This invokes an AWS Lambda function, which starts the SBOM generation job for the specific image.
  3. When the job completes, Amazon Inspector deposits the SBOM file in an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Another Lambda function is invoked whenever a new JSON file is deposited. The function performs the data transformation steps and uploads the new file into a new S3 bucket.
  5. Amazon Athena is then used to perform preliminary data exploration.
  6. A dashboard on Amazon QuickSight displays SBOM data.

Implement the solution

This section describes how to deploy the solution architecture.

In this post, you’ll perform the following tasks:

  • Create S3 buckets and AWS KMS keys to store the SBOMs
  • Create an Amazon Elastic Container Registry (Amazon ECR) repository
  • Deploy two AWS Lambda functions to initiate the SBOM generation and transformation
  • Set up Amazon EventBridge rules to invoke Lambda functions upon image push into Amazon ECR
  • Run AWS Glue crawlers to crawl the transformed SBOM S3 bucket
  • Run Amazon Athena queries to review SBOM data
  • Create QuickSight dashboards to identify libraries and packages
  • Use QuickSight Q to identify libraries and packages by using natural language queries

Deploy the CloudFormation stack

The AWS CloudFormation template we’ve provided provisions the S3 buckets that are required for the storage of raw SBOMs and transformed SBOMs, the Lambda functions necessary to initiate and process the SBOMs, and EventBridge rules to run the Lambda functions based on certain events. An empty repository is provisioned as part of the stack, but you can also use your own repository.

To deploy the CloudFormation stack

  1. Download the CloudFormation template.
  2. Browse to the CloudFormation service in your AWS account and choose Create Stack.
  3. Upload the CloudFormation template you downloaded earlier.
  4. For the next step, Specify stack details, enter a stack name.
  5. You can keep the default value of sbom-inspector for EnvironmentName.
  6. Specify the Amazon Resource Name (ARN) of the user or role to be the admin for the KMS key.
  7. Deploy the stack.

Set up Amazon Inspector

If this is the first time you’re using Amazon Inspector, you need to activate the service. In the Getting started with Amazon Inspector topic in the Amazon Inspector User Guide, follow Step 1 to activate the service. This will take some time to complete.

Figure 2: Activate Amazon Inspector

Figure 2: Activate Amazon Inspector

SBOM invocation and processing Lambda functions

This solution uses two Lambda functions written in Python to perform the invocation task and the transformation task.

  • Invocation task — This function is run whenever a new image is pushed into Amazon ECR. It takes in the repository name and image tag variables and passes those into the create_sbom_export function in the SPDX format. This prevents duplicated SBOMs, which helps to keep the S3 data size small.
  • Transformation task — This function is run whenever a new file with the suffix .json is added to the raw S3 bucket. It creates two files, as follows:
    1. It extracts information such as image ARN, account number, package, package version, operating system, and SHA from the SBOM and exports this data to the transformed S3 bucket under a folder named sbom/.
    2. Because each package can have more than one CVE, this function also extracts the CVE from each package and stores it in the same bucket in a directory named cve/. Both files are exported in Apache Parquet so that the file is in a format that is optimized for queries by Amazon Athena.

Populate the AWS Glue Data Catalog

To populate the AWS Glue Data Catalog, you need to generate the SBOM files by using the Lambda functions that were created earlier.

To populate the AWS Glue Data Catalog

  1. You can use an existing image, or you can continue on to create a sample image.
  2. Open an AWS Cloudshell terminal.
  3. Run the follow commands
    # Pull the nginx image from a public repo
    docker pull public.ecr.aws/nginx/nginx:1.19.10-alpine-perl
    
    docker tag public.ecr.aws/nginx/nginx:1.19.10-alpine-perl <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sbom-inspector:nginxperl
    
    # Authenticate to ECR, fill in your account id
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com
    
    # Push the image into ECR
    docker push <ACCOUNT-ID>.dkr.ecr.us-east-1.amazonaws.com/sbom-inspector:nginxperl

  4. An image is pushed into the Amazon ECR repository in your account. This invokes the Lambda functions that perform the SBOM export by using Amazon Inspector and converts the SBOM file to Parquet.
  5. Verify that the Parquet files are in the transformed S3 bucket:
    1. Browse to the S3 console and choose the bucket named sbom-inspector-<ACCOUNT-ID>-transformed. You can also track the invocation of each Lambda function in the Amazon CloudWatch log console.
    2. After the transformation step is complete, you will see two folders (cve/ and sbom/)in the transformed S3 bucket. Choose the sbom folder. You will see the transformed Parquet file in it. If there are CVEs present, a similar file will appear in the cve folder.

    The next step is to run an AWS Glue crawler to determine the format, schema, and associated properties of the raw data. You will need to crawl both folders in the transformed S3 bucket and store the schema in separate tables in the AWS Glue Data Catalog.

  6. On the AWS Glue Service console, on the left navigation menu, choose Crawlers.
  7. On the Crawlers page, choose Create crawler. This starts a series of pages that prompt you for the crawler details.
  8. In the Crawler name field, enter sbom-crawler, and then choose Next.
  9. Under Data sources, select Add a data source.
  10. Now you need to point the crawler to your data. On the Add data source page, choose the Amazon S3 data store. This solution in this post doesn’t use a connection, so leave the Connection field blank if it’s visible.
  11. For the option Location of S3 data, choose In this account. Then, for S3 path, enter the path where the crawler can find the sbom and cve data, which is s3://sbom-inspector-<ACCOUNT-ID>-transformed/sbom/ and s3://sbom-inspector-<ACCOUNT-ID>-transformed/cve/. Leave the rest as default and select Add an S3 data source.
     
    Figure 3: Data source for AWS Glue crawler

    Figure 3: Data source for AWS Glue crawler

  12. The crawler needs permissions to access the data store and create objects in the Data Catalog. To configure these permissions, choose Create an IAM role. The AWS Identity and Access Management (IAM) role name starts with AWSGlueServiceRole-, and in the field, you enter the last part of the role name. Enter sbomcrawler, and then choose Next.
  13. Crawlers create tables in your Data Catalog. Tables are contained in a database in the Data Catalog. To create a database, choose Add database. In the pop-up window, enter sbom-db for the database name, and then choose Create.
  14. Verify the choices you made in the Add crawler wizard. If you see any mistakes, you can choose Back to return to previous pages and make changes. After you’ve reviewed the information, choose Finish to create the crawler.
    Figure 4: Creation of the AWS Glue crawler

    Figure 4: Creation of the AWS Glue crawler

  15. Select the newly created crawler and choose Run.
  16. After the crawler runs successfully, verify that the table is created and the data schema is populated.
     
    Figure 5: Table populated from the AWS Glue crawler

    Figure 5: Table populated from the AWS Glue crawler

Set up Amazon Athena

Amazon Athena performs the initial data exploration and validation. Athena is a serverless interactive analytics service built on open source frameworks that supports open-table and file formats. Athena provides a simplified, flexible way to analyze data in sources like Amazon S3 by using standard SQL queries. If you are SQL proficient, you can query the data source directly; however, not everyone is familiar with SQL. In this section, you run a sample query and initialize the service so that it can used in QuickSight later on.

To start using Amazon Athena

  1. In the AWS Management Console, navigate to the Athena console.
  2. For Database, select sbom-db (or select the database you created earlier in the crawler).
  3. Navigate to the Settings tab located at the top right corner of the console. For Query result location, select the Athena S3 bucket created from the CloudFormation template, sbom-inspector-<ACCOUNT-ID>-athena.
  4. Keep the defaults for the rest of the settings. You can now return to the Query Editor and start writing and running your queries on the sbom-db database.

You can use the following sample query.

select package, packageversion, cve, sha, imagearn from sbom
left join cve
using (sha, package, packageversion)
where cve is not null;

Your Athena console should look similar to the screenshot in Figure 6.

Figure 6: Sample query with Amazon Athena

Figure 6: Sample query with Amazon Athena

This query joins the two tables and selects only the packages with CVEs identified. Alternatively, you can choose to query for specific packages or identify the most common package used in your organization.

Sample output:

# package packageversion cve sha imagearn
<PACKAGE_NAME> <PACKAGE_VERSION> <CVE> <IMAGE_SHA> <ECR_IMAGE_ARN>

Visualize data with Amazon QuickSight

Amazon QuickSight is a serverless business intelligence service that is designed for the cloud. In this post, it serves as a dashboard that allows business users who are unfamiliar with SQL to identify zero-day vulnerabilities. This can also reduce the operational effort and time of having to look through several JSON documents to identify a single package across your image repositories. You can then share the dashboard across teams without having to share the underlying data.

QuickSight SPICE (Super-fast, Parallel, In-memory Calculation Engine) is an in-memory engine that QuickSight uses to perform advanced calculations. In a large organization where you could have millions of SBOM records stored in S3, importing your data into SPICE helps to reduce the time to process and serve the data. You can also use the feature to perform a scheduled refresh to obtain the latest data from S3.

QuickSight also has a feature called QuickSight Q. With QuickSightQ, you can use natural language to interact with your data. If this is the first time you are initializing QuickSight, subscribe to QuickSight and select Enterprise + Q. It will take roughly 20–30 minutes to initialize for the first time. Otherwise, if you are already using QuickSight, you will need to enable QuickSight Q by subscribing to it in the QuickSight console.

Finally, in QuickSight you can select different data sources, such as Amazon S3 and Athena, to create custom visualizations. In this post, we will use the two Athena tables as the data source to create a dashboard to keep track of the packages used in your organization and the resulting CVEs that come with them.

Prerequisites for setting up the QuickSight dashboard

This process will be used to create the QuickSight dashboard from a template already pre-provisioned through the command line interface (CLI). It also grants the necessary permissions for QuickSight to access the data source. You will need the following:

  • AWS Command Line Interface (AWS CLI) programmatic access with read and write permissions to QuickSight.
  • A QuickSight + Q subscription (only if you want to use the Q feature).
  • QuickSight permissions to Amazon S3 and Athena (enable these through the QuickSight security and permissions interface).
  • Set the default AWS Region where you want to deploy the QuickSight dashboard. This post assumes that you’re using the us-east-1 Region.

Create datasets

In QuickSight, create two datasets, one for the sbom table and another for the cve table.

  1. In the QuickSight console, select the Dataset tab.
  2. Choose Create dataset, and then select the Athena data source.
  3. Name the data source sbom and choose Create data source.
  4. Select the sbom table.
  5. Choose Visualize to complete the dataset creation. (Delete the analyses automatically created for you because you will create your own analyses afterwards.)
  6. Navigate back to the main QuickSight page and repeat steps 1–4 for the cve dataset.

Merge datasets

Next, merge the two datasets to create the combined dataset that you will use for the dashboard.

  1. On the Datasets tab, edit the sbom dataset and add the cve dataset.
  2. Set three join clauses, as follows:
    1. Sha : Sha
    2. Package : Package
    3. Packageversion : Packageversion
  3. Perform a left merge, which will append the cve ID to the package and package version in the sbom dataset.
     
    Figure 7: Combining the sbom and cve datasets

    Figure 7: Combining the sbom and cve datasets

Next, you will create a dashboard based on the combined sbom dataset.

Prepare configuration files

In your terminal, export the following variables. Substitute <QuickSight username> in the QS_USER_ARN variable with your own username, which can be found in the Amazon QuickSight console.

export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
export TEMPLATE_ID=”sbom_dashboard”
export QS_USER_ARN=$(aws quicksight describe-user --aws-account-id $ACCOUNT_ID --namespace default --user-name <QuickSight username> | jq .User.Arn)
export QS_DATA_ARN=$(aws quicksight search-data-sets --aws-account-id $ACCOUNT_ID --filters Name="DATASET_NAME",Operator="StringLike",Value="sbom" | jq .DataSetSummaries[0].Arn)

Validate that the variables are set properly. This is required for you to move on to the next step; otherwise you will run into errors.

echo ACCOUNT_ID is $ACCOUNT_ID || echo ACCOUNT_ID is not set
echo TEMPLATE_ID is $TEMPLATE_ID || echo TEMPLATE_ID is not set
echo QUICKSIGHT USER ARN is $QS_USER_ARN || echo QUICKSIGHT USER ARN is not set
echo QUICKSIGHT DATA ARN is $QS_DATA_ARN || echo QUICKSIGHT DATA ARN is not set

Next, use the following commands to create the dashboard from a predefined template and create the IAM permissions needed for the user to view the QuickSight dashboard.

cat < ./dashboard.json
{
    "SourceTemplate": {
      "DataSetReferences": [
        {
          "DataSetPlaceholder": "sbom",
          "DataSetArn": $QS_DATA_ARN
        }
      ],
      "Arn": "arn:aws:quicksight:us-east-1:293424211206:template/sbom_qs_template"
    }
}
EOF

cat < ./dashboardpermissions.json
[
    {
      "Principal": $QS_USER_ARN,
      "Actions": [
        "quicksight:DescribeDashboard",
        "quicksight:ListDashboardVersions",
        "quicksight:UpdateDashboardPermissions",
        "quicksight:QueryDashboard",
        "quicksight:UpdateDashboard",
        "quicksight:DeleteDashboard",
        "quicksight:DescribeDashboardPermissions",
        "quicksight:UpdateDashboardPublishedVersion"
      ]
    }
]
EOF

Run the following commands to create the dashboard in your QuickSight console.

aws quicksight create-dashboard --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID --name sbom-dashboard --source-entity file://dashboard.json

Note: Run the following describe-dashboard command, and confirm that the response contains a status code of 200. The 200-status code means that the dashboard exists.

aws quicksight describe-dashboard --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID

Use the following update-dashboard-permissions AWS CLI command to grant the appropriate permissions to QuickSight users.

aws quicksight update-dashboard-permissions --aws-account-id $ACCOUNT_ID --dashboard-id $ACCOUNT_ID --grant-permissions file://dashboardpermissions.json

You should now be able to see the dashboard in your QuickSight console, similar to the one in Figure 8. It’s an interactive dashboard that shows you the number of vulnerable packages you have in your repositories and the specific CVEs that come with them. You can navigate to the specific image by selecting the CVE (middle right bar chart) or list images with a specific vulnerable package (bottom right bar chart).

Note: You won’t see the exact same graph as in Figure 8. It will change according to the image you pushed in.

Figure 8: QuickSight dashboard containing SBOM information

Figure 8: QuickSight dashboard containing SBOM information

Alternatively, you can use QuickSight Q to extract the same information from your dataset through natural language. You will need to create a topic and add the dataset you added earlier. For detailed information on how to create a topic, see the Amazon QuickSight User Guide. After QuickSight Q has completed indexing the dataset, you can start to ask questions about your data.

Figure 9: Natural language query with QuickSight Q

Figure 9: Natural language query with QuickSight Q

Conclusion

This post discussed how you can use Amazon Inspector to export SBOMs to improve software supply chain transparency. Container SBOM export should be part of your supply chain mitigation strategy and monitored in an automated manner at scale.

Although it is a good practice to generate SBOMs, it would provide little value if there was no further analysis being done on them. This solution enables you to visualize your SBOM data through a dashboard and natural language, providing better visibility into your security posture. Additionally, this solution is also entirely serverless, meaning there are no agents or sidecars to set up.

To learn more about exporting SBOMs with Amazon Inspector, see the Amazon Inspector User Guide.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Jason Ng

Jason Ng

Jason is a Cloud Sales Center Solutions Architect at AWS. He works with enterprise and independent software vendor (ISV) greenfield customers in ASEAN countries and is part of the Containers Technical Field Community (TFC). He enjoys helping customers modernize their applications, drive growth, and reduce total cost of ownership.

Empowering data-driven excellence: How the Bluestone Data Platform embraced data mesh for success

Post Syndicated from Toney Thomas original https://aws.amazon.com/blogs/big-data/empowering-data-driven-excellence-how-the-bluestone-data-platform-embraced-data-mesh-for-success/

This post is co-written with Toney Thomas and Ben Vengerovsky from Bluestone.

In the ever-evolving world of finance and lending, the need for real-time, reliable, and centralized data has become paramount. Bluestone, a leading financial institution, embarked on a transformative journey to modernize its data infrastructure and transition to a data-driven organization. In this post, we explore how Bluestone uses AWS services, notably the cloud data warehousing service Amazon Redshift, to implement a cutting-edge data mesh architecture, revolutionizing the way they manage, access, and utilize their data assets.

The challenge: Legacy to modernization

Bluestone was operating with a legacy SQL-based lending platform, as illustrated in the following diagram. To stay competitive and responsive to changing market dynamics, they decided to modernize their infrastructure. This modernization involved transitioning to a software as a service (SaaS) based loan origination and core lending platforms. Because these new systems produced vast amounts of data, the challenge of ensuring a single source of truth for all data consumers emerged.

Birth of the Bluestone Data Platform

To address the need for centralized, scalable, and governable data, Bluestone introduced the Bluestone Data Platform. This platform became the hub for all data-related activities across the organization. AWS played a pivotal role in bringing this vision to life.

The following are the key components of the Bluestone Data Platform:

  • Data mesh architecture – Bluestone adopted a data mesh architecture, a paradigm that distributes data ownership across different business units. Each data producer within the organization has its own data lake in Apache Hudi format, ensuring data sovereignty and autonomy.
  • Four-layered data lake and data warehouse architecture – The architecture comprises four layers, including the analytical layer, which houses purpose-built facts and dimension datasets that are hosted in Amazon Redshift. These datasets are pivotal for reporting and analytics use cases, powered by services like Amazon Redshift and tools like Power BI.
  • Machine learning analytics – Various business units, such as Servicing, Lending, Sales & Marketing, Finance, and Credit Risk, use machine learning analytics, which run on top of the dimensional model within the data lake and data warehouse. This enables data-driven decision-making across the organization.
  • Governance and self-service – The Bluestone Data Platform provides a governed, curated, and self-service avenue for all data use cases. AWS services like AWS Lake Formation in conjunction with Atlan help govern data access and policies.
  • Data quality framework – To ensure data reliability, they implemented a data quality framework. It continuously assesses data quality and syncs quality scores to the Atlan governance tool, instilling confidence in the data assets within the platform.

The following diagram illustrates the architecture of their updated data platform.

AWS and third-party services

AWS played a pivotal and multifaceted role in empowering Bluestone’s Data Platform to thrive. The following AWS and third-party services were instrumental in shaping Bluestone’s journey toward becoming a data-driven organization:

  • Amazon Redshift – Bluestone harnessed the power of Amazon Redshift and its features like data sharing to create a centralized repository of data assets. This strategic move facilitated seamless data sharing and collaboration across diverse business units, paving the way for more informed and data-driven decision-making.
  • Lake Formation – Lake Formation emerged as a cornerstone in Bluestone’s data governance strategy. It played a critical role in enforcing data access controls and implementing data policies. With Lake Formation, Bluestone achieved protection of sensitive data and compliance with regulatory requirements.
  • Data quality monitoring – To maintain data reliability and accuracy, Bluestone deployed a robust data quality framework. AWS services were essential in this endeavor, because they complemented open source tools to establish an in-house data quality monitoring system. This system continuously assesses data quality, providing confidence in the reliability of the organization’s data assets.
  • Data governance tooling – Bluestone chose Atlan, available through AWS Marketplace, to implement comprehensive data governance tooling. This SaaS service played a pivotal role in onboarding multiple business teams and fostering a data-centric culture within Bluestone. It empowered teams to efficiently manage and govern data assets.
  • Orchestration using Amazon MWAA – Bluestone heavily relied on Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to manage workflow orchestrations efficiently. This orchestration framework seamlessly integrated with various data quality rules, which were evaluated using Great Expectations operators within the Airflow environment.
  • AWS DMS – Bluestone used AWS Database Migration Service (AWS DMS) to streamline the consolidation of legacy data into the data platform. This service facilitated the smooth transfer of data from legacy SQL Server warehouses to the data lake and data warehouse, providing data continuity and accessibility.
  • AWS Glue – Bluestone used the AWS Glue PySpark environment for implementing data extract, transform, and load (ETL) processes. It played a pivotal role in processing data originating from various source systems, providing data consistency and suitability for analytical use.
  • AWS Glue Data Catalog – Bluestone centralized their data management using the AWS Glue Data Catalog. This catalog served as the backbone for managing data assets within the Bluestone data estate, enhancing data discoverability and accessibility.
  • AWS CloudTrail – Bluestone implemented AWS CloudTrail to monitor and audit platform activities rigorously. This security-focused service provided essential visibility into platform actions, providing compliance and security in data operations.

AWS’s comprehensive suite of services has been integral in propelling the Bluestone Data Platform towards data-driven success. These services have not only enabled efficient data governance, quality assurance, and orchestration, but have also fostered a culture of data centricity within the organization, ultimately leading to better decision-making and competitive advantage. Bluestone’s journey showcases the power of AWS in transforming organizations into data-driven leaders in their respective industries.

Bluestone data architecture

Bluestone’s data architecture has undergone a dynamic transformation, transitioning from a lake house framework to a data mesh architecture. This evolution was driven by the organization’s need for data products with distributed ownership and the necessity for a centralized mechanism to govern and access these data products across various business units.

The following diagram illustrates the solution architecture and its use of AWS and third-party services.

Let’s delve deeper into how this architecture shift has unfolded and what it entails:

  • The need for change – The catalyst for this transformation was the growing demand for discrete data products tailored to the unique requirements of each business unit within Bluestone. Because these business units generated their own data assets in their respective domains, the challenge lay in efficiently managing, governing, and accessing these diverse data stores. Bluestone recognized the need for a more structured and scalable approach.
  • Data products with distributed ownership – In response to this demand, Bluestone adopted a data mesh architecture, which allowed for the creation of distinct data products aligned with each business unit’s needs. Each of these data products exists independently, generating and curating data assets specific to its domain. These data products serve as individual data hubs, ensuring data autonomy and specialization.
  • Centralized catalog integration – To streamline the discovery and accessibility of the data assets that are dispersed across these data products, Bluestone introduced a centralized catalog. This catalog acts as a unified repository where all data products register their respective data assets. It serves as a critical component for data discovery and management.
  • Data governance tool integration – Ensuring data governance and lineage tracking across the organization was another pivotal consideration. Bluestone implemented a robust data governance tool that connects to the centralized catalog. This integration makes sure that the overarching lineage of data assets is comprehensively mapped and captured. Data governance processes are thereby enforced consistently, guaranteeing data quality and compliance.
  • Amazon Redshift data sharing for control and access – To facilitate controlled and secure access to data assets residing within individual data product Redshift instances, Bluestone used Amazon Redshift data sharing. This capability allows data assets to be exposed and shared selectively, providing granular control over access while maintaining data security and integrity.

In essence, Bluestone’s journey from a lake house to a data mesh architecture represents a strategic shift in data management and governance. This transformation empowers different business units to operate autonomously within their data domains while ensuring centralized control, governance, and accessibility. The integration of a centralized catalog and data governance tooling, coupled with the flexibility of Amazon Redshift data sharing, creates a harmonious ecosystem where data-driven decision-making thrives, ultimately contributing to Bluestone’s success in the ever-evolving financial landscape.

Conclusion

Bluestone’s journey from a legacy SQL-based system to a modern data mesh architecture on AWS has improved the way the organization interacts with data and positioned them as a data-driven powerhouse in the financial industry. By embracing AWS services, Bluestone has successfully achieved a centralized, scalable, and governable data platform that empowers its teams to make informed decisions, drive innovation, and stay ahead in the competitive landscape. This transformation serves as compelling proof that Amazon Redshift and AWS Cloud data sharing capabilities are a great pathway for organizations looking to embark on their own data-driven journeys with AWS.


About the Authors

Toney Thomas is a Data Architect and Data Engineering Lead at Bluestone, renowned for his role in envisioning and coining the company’s pioneering data strategy. With a strategic focus on harnessing the power of advanced technology to tackle intricate business challenges, Toney leads a dynamic team of Data Engineers, Reporting Engineers, Quality Assurance specialists, and Business Analysts at Bluestone. His leadership extends to driving the implementation of robust data governance frameworks across diverse organizational units. Under his guidance, Bluestone has achieved remarkable success, including the deployment of innovative platforms such as a fully governed data mesh business data system with embedded data quality mechanisms, aligning seamlessly with the organization’s commitment to data democratization and excellence.

Ben Vengerovsky is a Data Platform Product Manager at Bluestone. He is passionate about using cloud technology to revolutionize the company’s data infrastructure. With a background in mortgage lending and a deep understanding of AWS services, Ben specializes in designing scalable and efficient data solutions that drive business growth and enhance customer experiences. He thrives on collaborating with cross-functional teams to translate business requirements into innovative technical solutions that empower data-driven decision-making.

Rada Stanic is a Chief Technologist at Amazon Web Services, where she helps ANZ customers across different segments solve their business problems using AWS Cloud technologies. Her special areas of interest are data analytics, machine learning/AI, and application modernization.

Combine AWS Glue and Amazon MWAA to build advanced VPC selection and failover strategies

Post Syndicated from Michael Greenshtein original https://aws.amazon.com/blogs/big-data/combine-aws-glue-and-amazon-mwaa-to-build-advanced-vpc-selection-and-failover-strategies/

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development.

AWS Glue customers often have to meet strict security requirements, which sometimes involve locking down the network connectivity allowed to the job, or running inside a specific VPC to access another service. To run inside the VPC, the jobs needs to be assigned to a single subnet, but the most suitable subnet can change over time (for instance, based on the usage and availability), so you may prefer to make that decision at runtime, based on your own strategy.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is an AWS service to run managed Airflow workflows, which allow writing custom logic to coordinate how tasks such as AWS Glue jobs run.

In this post, we show how to run an AWS Glue job as part of an Airflow workflow, with dynamic configurable selection of the VPC subnet assigned to the job at runtime.

Solution overview

To run inside a VPC, an AWS Glue job needs to be assigned at least a connection that includes network configuration. Any connection allows specifying a VPC, subnet, and security group, but for simplicity, this post uses connections of type: NETWORK, which just defines the network configuration and doesn’t involve external systems.

If the job has a fixed subnet assigned by a single connection, in case of a service outage on the Availability Zones or if the subnet isn’t available for other reasons, the job can’t run. Furthermore, each node (driver or worker) in an AWS Glue job requires an IP address assigned from the subnet. When running many large jobs concurrently, this could lead to an IP address shortage and the job running with fewer nodes than intended or not running at all.

AWS Glue extract, transform, and load (ETL) jobs allow multiple connections to be specified with multiple network configurations. However, the job will always try to use the connections’ network configuration in the order listed and pick the first one that passes the health checks and has at least two IP addresses to get the job started, which might not be the optimal option.

With this solution, you can enhance and customize that behavior by reordering the connections dynamically and defining the selection priority. If a retry is needed, the connections are reprioritized again based on the strategy, because the conditions might have changed since the last run.

As a result, it helps prevent the job from failing to run or running under capacity due to subnet IP address shortage or even an outage, while meeting the network security and connectivity requirements.

The following diagram illustrates the solution architecture.

Prerequisites

To follow the steps of the post, you need a user that can log in to the AWS Management Console and has permission to access Amazon MWAA, Amazon Virtual Private Cloud (Amazon VPC), and AWS Glue. The AWS Region where you choose to deploy the solution needs the capacity to create a VPC and two elastic IP addresses. The default Regional quota for both types of resources is five, so you might need to request an increase via the console.

You also need an AWS Identity and Access Management (IAM) role suitable to run AWS Glue jobs if you don’t have one already. For instructions, refer to Create an IAM role for AWS Glue.

Deploy an Airflow environment and VPC

First, you’ll deploy a new Airflow environment, including the creation of a new VPC with two public subnets and two private ones. This is because Amazon MWAA requires Availability Zone failure tolerance, so it needs to run on two subnets on two different Availability Zones in the Region. The public subnets are used so the NAT Gateway can provide internet access for the private subnets.

Complete the following steps:

  1. Create an AWS CloudFormation template in your computer by copying the template from the following quick start guide into a local text file.
  2. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  3. Choose Create stack with the option With new resources (standard).
  4. Choose Upload a template file and choose the local template file.
  5. Choose Next.
  6. Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
  7. On the last step, acknowledge that resources will be created and choose Submit.

The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE.

The resource that will take most of time is the Airflow environment. While it’s being created, you can continue with the following steps, until you are required to open the Airflow UI.

  1. On the stack’s Resources tab, note the IDs for the VPC and two private subnets (PrivateSubnet1 and PrivateSubnet2), to use in the next step.

Create AWS Glue connections

The CloudFormation template deploys two private subnets. In this step, you create an AWS Glue connection to each one so AWS Glue jobs can run in them. Amazon MWAA recently added the capacity to run the Airflow cluster on shared VPCs, which reduces cost and simplifies network management. For more information, refer to Introducing shared VPC support on Amazon MWAA.

Complete the following steps to create the connections:

  1. On the AWS Glue console, choose Data connections in the navigation pane.
  2. Choose Create connection.
  3. Choose Network as the data source.
  4. Choose the VPC and private subnet (PrivateSubnet1) created by the CloudFormation stack.
  5. Use the default security group.
  6. Choose Next.
  7. For the connection name, enter MWAA-Glue-Blog-Subnet1.
  8. Review the details and complete the creation.
  9. Repeat these steps using PrivateSubnet2 and name the connection MWAA-Glue-Blog-Subnet2.

Create the AWS Glue job

Now you create the AWS Glue job that will be triggered later by the Airflow workflow. The job uses the connections created in the previous section, but instead of assigning them directly on the job, as you would normally do, in this scenario you leave the job connections list empty and let the workflow decide which one to use at runtime.

The job script in this case is not significant and is just intended to demonstrate the job ran in one of the subnets, depending on the connection.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose Script editor.
  2. Leave the default options (Spark engine and Start fresh) and choose Create script.
  3. Replace the placeholder script with the following Python code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The driver node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Rename the job to AirflowBlogJob.
  5. On the Job details tab, for IAM Role, choose any role and enter 2 for the number of workers (just for frugality).
  6. Save these changes so the job is created.

Grant AWS Glue permissions to the Airflow environment role

The role created for Airflow by the CloudFormation template provides the basic permissions to run workflows but not to interact with other services such as AWS Glue. In a production project, you would define your own templates with these additional permissions, but in this post, for simplicity, you add the additional permissions as an inline policy. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Locate the role created by the template; it will start with the name you assigned to the CloudFormation stack and then -MwaaExecutionRole-.
  3. On the role details page, on the Add permissions menu, choose Create inline policy.
  4. Switch from Visual to JSON mode and enter the following JSON on the textbox. It assumes that the AWS Glue role you have follows the convention of starting with AWSGlueServiceRole. For enhanced security, you can replace the wildcard resource on the ec2:DescribeSubnets permission with the ARNs of the two private subnets from the CloudFormation stack.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:DescribeSubnets"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Choose Next.
  6. Enter GlueRelatedPermissions as the policy name and complete the creation.

In this example, we use an ETL script job; for a visual job, because it generates the script automatically on save, the Airflow role would need permission to write to the configured script path on Amazon Simple Storage Service (Amazon S3).

Create the Airflow DAG

An Airflow workflow is based on a Directed Acyclic Graph (DAG), which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:

  1. Create a local file named glue_job_dag.py using a text editor.

In each of the following steps, we provide a code snippet to enter into the file and an explanation of what is does.

  1. The following snippet adds the required Python modules imports. The modules are already installed on Airflow; if that weren’t the case, you would need to use a requirements.txt file to indicate to Airflow which modules to install. It also defines the Boto3 clients that the code will use later. By default, they will use the same role and Region as Airflow, that’s why you set up before the role with the additional permissions required.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. The following snippet adds three functions to implement the connection order strategy, which defines how to reorder the connections given to establish their priority. This is just an example; you can build your custom code to implement your own logic, as per your needs. The code first checks the IPs available on each connection subnet and separates the ones that have enough IPs available to run the job at full capacity and those that could be used because they have at least two IPs available, which is the minimum a job needs to start. If the strategy is set to random, it will randomize the order within each of the connection groups previously described and add any other connections. If the strategy is capacity, it will order them from most IPs free to fewest.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Name=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            try:
                available_ips = get_available_ips_from_connection(connection_name)
                # Priority to connections that can hold the full cluster and we haven't just tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The bare minimum to start a Glue job
                    usable_connections.append((connection_name, available_ips))                
            except Exception as e:
                print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, strategy):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if strategy=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have priority
            all_conn = good_connections + usable_connections
        elif strategy=="capacity":
            # We can sort both at the same time
            all_conn = good_connections + usable_connections
            all_conn.sort(key=lambda x: -x[1])
        else: 
            raise ValueError(f"Unknown strategy specified: {strategy}")    
        result = [c[0] for c in all_conn] # Just need the name
        # Keep at the end any other connections that could not be checked for ips
        result += [c for c in connection_list if c not in result]
        return result
    

  3. The following code creates the DAG itself with the run job task, which updates the job with the connection order defined by the strategy, runs it, and waits for the results. The job name, connections, and strategy come from Airflow variables, so it can be easily configured and updated. It has two retries with exponential backoff configured, so if the tasks fails, it will repeat the full task including the connection selection. Maybe now the best choice is another connection, or the subnet previously picked randomly is in an Availability Zone that is currently suffering an outage, and by picking a different one, it can recover.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand only
        start_date=datetime(2000, 1, 1), # A start date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @task(
            task_id="glue_task", 
            retries=2,
            retry_delay=duration(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity
            print(f"Connections available: {glue_connections}")
            print(f"Glue job name: {glue_jobname}")
            print(f"Strategy to use: {strategy}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, strategy)
            print(f"Running Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Preserve other connections that we don't manage
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clean up properties so we can reuse the dict for the update request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].split('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Create the Airflow workflow

Now you create a workflow that invokes the AWS Glue job you just created:

  1. On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack and then -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file glue_job_dag.py that you created in the previous section.
  3. On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.

If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes since you deployed the CloudFormation stack.

  1. Choose the environment link on the table to see the environment details.

It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.

  1. Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to log you in.

If there’s any issue with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.

  1. On the Admin menu, choose Variables.
  2. Add three variables with the following keys and values:
    1. Key glue_job_dag.glue_connections with value MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. Key glue_job_dag.glue_job_name with value AirflowBlogJob.
    3. Key glue_job_dag.strategy with value capacity.

Run the job with a dynamic subnet assignment

Now you’re ready to run the workflow and see the strategy dynamically reordering the connections.

  1. On the Airflow UI, choose DAGs, and on the row glue_job_dag, choose the play icon.
  2. On the Browse menu, choose Task instances.
  3. On the instances table, scroll right to display the Log Url and choose the icon on it to open the log.

The log will update as the task runs; you can locate the line starting with “Running Glue job with the connection order:” and the previous lines showing details of the connection IPs and the category assigned. If an error occurs, you’ll see the details in this log.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane, then choose the job AirflowBlogJob.
  2. On the Runs tab, choose the run instance, then the Output logs link, which will open a new tab.
  3. On the new tab, use the log stream link to open it.

It will display the IP that the driver was assigned and which subnet it belongs to, which should match the connection indicated by Airflow (if the log is not displayed, choose Resume so it gets updated as soon as it’s available).

  1. On the Airflow UI, edit the Airflow variable glue_job_dag.strategy to set it to random.
  2. Run the DAG multiple times and see how the ordering changes.

Clean up

If you no longer need the deployment, delete the resources to avoid any further charges:

  1. Delete the Python script you uploaded, so the S3 bucket can be automatically deleted in the next step.
  2. Delete the CloudFormation stack.
  3. Delete the AWS Glue job.
  4. Delete the script that the job saved in Amazon S3.
  5. Delete the connections you created as part of this post.

Conclusion

In this post, we showed how AWS Glue and Amazon MWAA can work together to build more advanced custom workflows, while minimizing the operational and management overhead. This solution gives you more control about how your AWS Glue job runs to meet special operational, network, or security requirements.

You can deploy your own Amazon MWAA environment in multiple ways, such as with the template used in this post, on the Amazon MWAA console, or using the AWS CLI. You can also implement your own strategies to orchestrate AWS Glue jobs, based on your network architecture and requirements (for instance, to run the job closer to the data when possible).


About the authors

Michael Greenshtein is an Analytics Specialist Solutions Architect for the Public Sector.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Enhance monitoring and debugging for AWS Glue jobs using new job observability metrics: Part 2

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-enhance-monitoring-and-debugging-for-aws-glue-jobs-using-new-job-observability-metrics/

Monitoring data pipelines in real time is critical for catching issues early and minimizing disruptions. AWS Glue has made this more straightforward with the launch of AWS Glue job observability metrics, which provide valuable insights into your data integration pipelines built on AWS Glue. However, you might need to track key performance indicators across multiple jobs. In this case, a dashboard that can visualize the same metrics with the ability to drill down into individual issues is an effective solution to monitor at scale.

This post, walks through how to integrate AWS Glue job observability metrics with Grafana using Amazon Managed Grafana. We discuss the types of metrics and charts available to surface key insights along with two use cases on monitoring error classes and throughput of your AWS Glue jobs.

Solution overview

Grafana is an open source visualization tool that allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. With Grafana, you can create, explore, and share visually rich, data-driven dashboards. The new AWS Glue job observability metrics can be effortlessly integrated with Grafana for real-time monitoring purpose. Metrics like worker utilization, skewness, I/O rate, and errors are captured and visualized in easy-to-read Grafana dashboards. The integration with Grafana provides a flexible way to build custom views of pipeline health tailored to your needs. Observability metrics open up monitoring capabilities that weren’t possible before for AWS Glue. Companies relying on AWS Glue for critical data integration pipelines can have greater confidence that their pipelines are running efficiently.

AWS Glue job observability metrics are emitted as Amazon CloudWatch metrics. You can provision and manage Amazon Managed Grafana, and configure the CloudWatch plugin for the given metrics. The following diagram illustrates the solution architecture.

Implement the solution

Complete following steps to set up the solution:

  1. Set up an Amazon Managed Grafana workspace.
  2. Sign in to your workspace.
  3. Choose Administration.
  4. Choose Add new data source.
  5. Choose CloudWatch.
  6. For Default Region, select your preferred AWS Region.
  7. For Namespaces of Custom Metrics, enter Glue.
  8. Choose Save & test.

Now the CloudWatch data source has been registered.

  1. Copy the data source ID from the URL https://g-XXXXXXXXXX.grafana-workspace.<region>.amazonaws.com/datasources/edit/<data-source-ID>/.

The next step is to prepare the JSON template file.

  1. Download the Grafana template.
  2. Replace <data-source-id> in the JSON file with your Grafana data source ID.

Lastly, configure the dashboard.

  1. On the Grafana console, choose Dashboards.
  2. Choose Import on the New menu.
  3. Upload your JSON file, and choose Import.

The Grafana dashboard visualizes AWS Glue observability metrics, as shown in the following screenshots.

The sample dashboard has the following charts:

  • [Reliability] Job Run Errors Breakdown
  • [Throughput] Bytes Read & Write
  • [Throughput] Records Read & Write
  • [Resource Utilization] Worker Utilization
  • [Job Performance] Skewness
  • [Resource Utilization] Disk Used (%)
  • [Resource Utilization] Disk Available (GB)
  • [Executor OOM] OOM Error Count
  • [Executor OOM] Heap Memory Used (%)
  • [Driver OOM] OOM Error Count
  • [Driver OOM] Heap Memory Used (%)

Analyze the causes of job failures

Let’s try analyzing the causes of job run failures of the job iot_data_processing.

First, look at the pie chart [Reliability] Job Run Errors Breakdown. This pie chart quickly identifies which errors are most common.

Then filter with the job name iot_data_processing to see the common errors for this job.

We can observe that the majority (75%) of failures were due to glue.error.DISK_NO_SPACE_ERROR.

Next, look at the line chart [Resource Utilization] Disk Used (%) to understand the driver’s used disk space during the job runs. For this job, the green line shows the driver’s disk usage, and the yellow line shows the average of the executors’ disk usage.

We can observe that there were three times when 100% of disk was used in executors.

Next, look at the line chart [Throughput] Records Read & Write to see whether the data volume was changed and whether it impacted disk usage.

The chart shows that around four billion records were read at the beginning of this range; however, around 63 billion records were read at the peak. This means that the incoming data volume has significantly increased, and caused local disk space shortage in the worker nodes. For such cases, you can increase the number of workers, enable auto scaling, or choose larger worker types.

After implementing those suggestions, we can see lower disk usage and a successful job run.

(Optional) Configure cross-account setup

We can optionally configure a cross-account setup. Cross-account metrics depend on CloudWatch cross-account observability. In this setup, we expect the following environment:

  • AWS accounts are not managed in AWS Organizations
  • You have two accounts: one account is used as the monitoring account where Grafana is located, another account is used as the source account where the AWS Glue-based data integration pipeline is located

To configure a cross-account setup for this environment, complete the following steps for each account.

Monitoring account

Complete the following steps to configure your monitoring account:

  1. Sign in to the AWS Management Console using the account you will use for monitoring.
  2. On the CloudWatch console, choose Settings in the navigation pane.
  3. Under Monitoring account configuration, choose Configure.
  4. For Select data, choose Metrics.
  5. For List source accounts, enter the AWS account ID of the source account that this monitoring account will view.
  6. For Define a label to identify your source account, choose Account name.
  7. Choose Configure.

Now the account is successfully configured as a monitoring account.

  1. Under Monitoring account configuration, choose Resources to link accounts.
  2. Choose Any account to get a URL for setting up individual accounts as source accounts.
  3. Choose Copy URL.

You will use the copied URL from the source account in the next steps.

Source account

Complete the following steps to configure your source account:

  1. Sign in to the console using your source account.
  2. Enter the URL that you copied from the monitoring account.

You can see the CloudWatch settings page, with some information filled in.

  1. For Select data, choose Metrics.
  2. Do not change the ARN in Enter monitoring account configuration ARN.
  3. The Define a label to identify your source account section is pre-filled with the label choice from the monitoring account. Optionally, choose Edit to change it.
  4. Choose Link.
  5. Enter Confirm in the box and choose Confirm.

Now your source account has been configured to link to the monitoring account. The metrics emitted in the source account will show on the Grafana dashboard in the monitoring account.

To learn more, see CloudWatch cross-account observability.

Considerations

The following are some considerations when using this solution:

  • Grafana integration is defined for real-time monitoring. If you have a basic understanding of your jobs, it will be straightforward for you to monitor performance, errors, and more on the Grafana dashboard.
  • Amazon Managed Grafana depends on AWS IAM Identify Center. This means you need to manage single sign-on (SSO) users separately, not just AWS Identity and Access Management (IAM) users and roles. It also requires another sign-in step from the AWS console. The Amazon Managed Grafana pricing model depends on an active user license per workspace. More users can cause more charges.
  • Graph lines are visualized per job. If you want to see the lines across all the jobs, you can choose ALL in the control.

Conclusion

AWS Glue job observability metrics offer a powerful new capability for monitoring data pipeline performance in real time. By streaming key metrics to CloudWatch and visualizing them in Grafana, you gain more fine-grained visibility that wasn’t possible before. This post showed how straightforward it is to enable observability metrics and integrate the data with Grafana using Amazon Managed Grafana. We explored the different metrics available and how to build customized Grafana dashboards to surface actionable insights.

Observability is now an essential part of robust data orchestration on AWS. With the ability to monitor data integration trends in real time, you can optimize costs, performance, and reliability.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Xiaoxi Liu is a Software Development Engineer on the AWS Glue team. Her passion is building scalable distributed systems for efficiently managing big data on the cloud, and her concentrations are distributed system, big data, and cloud computing.

Akira Ajisaka is a Senior Software Development Engineer on the AWS Glue team. He likes open source software and distributed systems. In his spare time, he enjoys playing arcade games.

Shenoda Guirguis is a Senior Software Development Engineer on the AWS Glue team. His passion is in building scalable and distributed data infrastructure and processing systems. When he gets a chance, Shenoda enjoys reading and playing soccer.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18-year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple to use interfaces to efficiently manage and transform petabytes of data seamlessly across data lakes on Amazon S3, databases and data-warehouses on cloud.