Tag Archives: AWS Glue

How Volkswagen streamlined access to data across multiple data lakes using Amazon DataZone – Part 1

Post Syndicated from Bandana Das original https://aws.amazon.com/blogs/big-data/how-volkswagen-streamlined-access-to-data-across-multiple-data-lakes-using-amazon-datazone-part-1/

Over the years, organizations have invested in creating purpose-built, cloud-based data lakes that are siloed from one another. A major challenge is enabling cross-organization discovery and access to data across these multiple data lakes, each built on different technology stacks. A data mesh addresses these issues with four principles: domain-oriented decentralized data ownership and architecture, treating data as a product, providing self-serve data infrastructure as a platform, and implementing federated governance. Data mesh enables organizations to organize around data domains with a focus on delivering data as a product.

In 2019, Volkswagen AG (VW) and Amazon Web Services (AWS) formed a strategic partnership to co-develop the Digital Production Platform (DPP), aiming to enhance production and logistics efficiency by 30 percent while reducing production costs by the same margin. The DPP was developed to streamline access to data from shop-floor devices and manufacturing systems by handling integrations and providing standardized interfaces. However, as applications evolved on the platform, a significant challenge emerged: sharing data across applications stored in multiple isolated data lakes in Amazon Simple Storage Service (Amazon S3) buckets in individual AWS accounts without having to consolidate data into a central data lake. Another challenge is discovering available data stored across multiple data lakes and facilitating a workflow to request data access across business domains within each plant. The current method is largely manual, relying on emails and general communication, which not only increases overhead but also varies from one use case to another in terms of data governance. This blog post introduces Amazon DataZone and explores how VW used it to build their data mesh to enable streamlined data access across multiple data lakes. It focuses on the key aspect of the solution, which was enabling data providers to automatically publish data assets to Amazon DataZone, which served as the central data mesh for enhanced data discoverability. Additionally, the post provides code to guide you through the implementation.

Introduction to Amazon DataZone

Amazon DataZone is a data management service that makes it faster and easier for customers to catalog, discover, share, and govern data stored across AWS, on premises, and third-party sources. Key features of Amazon DataZone include a business data catalog that allows users to search for published data, request access, and start working on data in days instead of weeks. Amazon DataZone projects enable collaboration with teams through data assets and the ability to manage and monitor data assets across projects. It also includes the Amazon DataZone portal, which offers a personalized analytics experience for data assets through a web-based application or API. Lastly, Amazon DataZone governed data sharing ensures that the right data is accessed by the right user for the right purpose with a governed workflow.

Architecture for Data Management with Amazon DataZone

Figure 1: Data mesh pattern implementation on AWS using Amazon DataZone

The architecture diagram (Figure 1) represents a high-level design based on the data mesh pattern. It separates source systems, data domain producers (data publishers), data domain consumers (data subscribers), and central governance to highlight key aspects. This cross-account data mesh architecture aims to create a scalable foundation for data platforms, supporting producers and consumers with consistent governance.

  1. A data domain producer resides in an AWS account and uses Amazon S3 buckets to store raw and transformed data. Producers ingest data into their S3 buckets through pipelines they manage, own, and operate. They are responsible for the full lifecycle of the data, from raw capture to a form suitable for external consumption.
  2. A data domain producer maintains its own ETL stack using AWS Glue, AWS Lambda to process, AWS Glue Databrew to profile the data and prepare the data asset (data product) before cataloguing it into AWS Glue Data Catalog in their account.
  3. A second pattern could be that a data domain producer prepares and stores the data asset as table within Amazon Redshift using AWS S3 Copy.
  4. Data domain producers publish data assets using datasource run to Amazon DataZone in the Central Governance account. This populates the technical metadata in the business data catalog for each data asset. The business metadata, can be added by business users to provide business context, tags, and data classification for the datasets. Producers control what to share, for how long, and how consumers interact with it.
  5. Producers can register and create catalog entries with AWS Glue from all their S3 buckets. The central governance account securely shares datasets between producers and consumers via metadata linking, with no data (except logs) existing in this account. Data ownership remains with the producer.
  6. With Amazon DataZone, once data is cataloged and published into the DataZone domain, it can be shared with multiple consumer accounts.
  7. The Amazon DataZone Data portal provides a personalized view for users to discover/search and submit requests for subscription of data assets using a web-based application. The data domain producer receives the notification of subscription requests in the Data portal and can approve/reject the requests.
  8. Once approved, the consumer account can read and further process data assets to implement various use cases with AWS Lambda, AWS Glue, Amazon Athena, Amazon Redshift query editor v2, Amazon QuickSight (Analytics use cases) and with Amazon Sagemaker (Machine learning use cases).

Manual process to publish data assets to Amazon DataZone

To publish a data asset from the producer account, each asset must be registered in Amazon DataZone as a data source for consumer subscription. The Amazon DataZone User Guide provides detailed steps to achieve this. In the absence of an automated registration process, all required tasks must be completed manually for each data asset.

How to automate publishing data assets from AWS Glue Data Catalog from the producer account to Amazon DataZone

Using the automated registration workflow, the manual steps can be automated for any new data asset that needs to be published in an Amazon DataZone domain or when there’s a schema change in an already published data asset.

The automated solution reduces the repetitive manual steps to publish the data sources (AWS Glue tables) into an Amazon DataZone domain.

Architecture for automated data asset publish

Figure 2 Architecture for automated data publish to Amazon DataZone

To automate publishing data assets:

  1. In the producer account (Account B), the data to be shared resides in an Amazon S3 bucket (Figure 2). An AWS Glue crawler is configured for the dataset to automatically create the schema using AWS Cloud Development Kit (AWS CDK).
  2. Once configured, the AWS Glue crawler crawls the Amazon S3 bucket and updates the metadata in the AWS Glue Data Catalog. The successful completion of the AWS Glue crawler generates an event in the default event bus of Amazon EventBridge.
  3. An EventBridge rule is configured to detect this event and invoke a dataset-registration AWS Lambda function.
  4. The AWS Lambda function performs all the steps to automatically register and publish the dataset in Amazon Datazone.

Steps performed in the dataset-registration AWS Lambda function

    • The AWS Lambda function retrieves the AWS Glue database and Amazon S3 information for the dataset from the Amazon Eventbridge event triggered by the successful run of the AWS Glue crawler.
    • It obtains the Amazon DataZone Datalake blueprint ID from the producer account and the Amazon DataZone domain ID and project ID by assuming an IAM role in the central governance account where the Amazon Datazone domain exists.
    • It enables the Amazon DataZone Datalake blueprint in the producer account.
    • It checks if the Amazon Datazone environment already exists within the Amazon DataZone project. If it does not, then it initiates the environment creation process. If the environment exists, it proceeds to the next step.
    • It registers the Amazon S3 location of the dataset in Lake Formation in the producer account.
    • The function creates a data source within the Amazon DataZone project and monitors the completion of the data source creation.
    • Finally, it checks whether the data source sync job in Amazon DataZone needs to be started. If new AWS Glue tables or metadata is created or updated, then it starts the data source sync job.

Prerequisites

As part of this solution, you will publish data assets from an existing AWS Glue database in a producer account into an Amazon DataZone domain for which the following prerequisites need to be performed.

  1. You need two AWS accounts to deploy the solution.
    • One AWS account will act as the data domain producer account (Account B) which will contain the AWS Glue dataset to be shared.
    • The second AWS account is the central governance account (Account A), which will have the Amazon DataZone domain and project deployed. This is the Amazon DataZone account.
    • Ensure that both the AWS accounts belong to the same AWS Organization
  2. Remove the IAMAllowedPrincipals permissions from the AWS Lake Formation tables for which Amazon DataZone handles permissions.
  3. Make sure in both AWS accounts that you have cleared the checkbox for Default permissions for newly created databases and tables under the Data Catalog settings in Lake Formation (Figure 3).

    Figure 3: Clear default permissions in AWS Lake Formation

  4. Sign in to Account A (central governance account) and make sure you have created an Amazon DataZone domain and a project within the domain.
  5. If your Amazon DataZone domain is encrypted with an AWS Key Management Service (AWS KMS) key, add Account B (producer account) to the key policy with the following actions:
    {
      "Sid": "Allow use of the key",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Account B>:root"
      },
      "Action": [
        "kms:Encrypt",
        "kms:Decrypt",
        "kms:ReEncrypt*",
        "kms:GenerateDataKey*",
        "kms:DescribeKey"
      ],
      "Resource": "*"
    }

  6. Ensure you have created an AWS Identity and Access Management (IAM) role that Account B (producer account) can assume and this IAM role is added as a member (as contributor) of your Amazon DataZone project. The role should have the following permissions:
    • This IAM role is called dz-assumable-env-dataset-registration-role in this example. Adding this role will enable you to successfully run the dataset-registration Lambda function. Replace the account-region, account id, and DataZonekmsKey in the following policy with your information. These values correspond to where your Amazon DataZone domain is created and the AWS KMS key Amazon Resource Name (ARN) used to encrypt the Amazon DataZone domain.
      {
          "Version": "2012-10-17",
          "Statement": [
               {
                  "Action": [
                      "DataZone:CreateDataSource",
                     "DataZone:CreateEnvironment",
                     "DataZone:CreateEnvironmentProfile",
                     "DataZone:GetDataSource",
                     "DataZone:GetEnvironment",
                     "DataZone:GetEnvironmentProfile",
                     "DataZone:GetIamPortalLoginUrl",
                     "DataZone:ListDataSources",
                      "DataZone:ListDomains",
                      "DataZone:ListEnvironmentProfiles",
                      "DataZone:ListEnvironments",
                      "DataZone:ListProjectMemberships",
                     "DataZone:ListProjects",
                      "DataZone:StartDataSourceRun"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                       "kms:Decrypt",
                      "kms:DescribeKey",
                      "kms:GenerateDataKey"
                  ],
                 "Resource": "arn:aws:kms:${account_region}:${account_id}:key/${DataZonekmsKey}",
                  "Effect": "Allow"
              }
          ]
      }

    • Add the AWS account in the trust relationship of this role with the following trust relationship. Replace ProducerAccountId with the AWS account ID of Account B (data domain producer account).
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": [
                          "arn:aws:iam::${ProducerAccountId}:root",
                      ]
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      } }

  7. The following tools are needed to deploy the solution using AWS CDK:

Deployment Steps

After completing the pre-requisites, use the AWS CDK stack provided on GitHub to deploy the solution for automatic registration of data assets into DataZone domain

  1. Clone the repository from GitHub to your preferred IDE using the following commands.
    git clone https://github.com/aws-samples/automate-and-simplify-aws-glue-data-asset-publish-to-amazon-datazone.git
    
    cd automate-and-simplify-aws-glue-data-asset-publish-to-amazon-datazone

  2. At the base of the repository folder, run the following commands to build and deploy resources to AWS.
    npm install 
    npm run lint

  3. Sign in to the AWS account B (the data domain producer account) using AWS Command Line Interface (AWS CLI) with your profile name.
  4. Ensure you have configured the AWS Region in your credential’s configuration file.
  5. Bootstrap the CDK environment with the following commands at the base of the repository folder. Replace <PROFILE_NAME> with the profile name of your deployment account (Account B). Bootstrapping is a one-time activity and is not needed if your AWS account is already bootstrapped.
    export AWS_PROFILE=<PROFILE_NAME>
    npm run cdk bootstrap

  6. Replace the placeholder parameters (marked with the suffix _PLACEHOLDER) in the file config/DataZoneConfig.ts (Figure 4).
    • Amazon DataZone domain and project name of your Amazon DataZone instance. Make sure all names are in lowercase.
    • The AWS account ID and Region.
    • The assumable IAM role from the prerequisites.
    • The deployment role starting with cfn-xxxxxx-cdk-exec-role-.

Figure 4: Edit the DataZoneConfig file

  1. In the AWS Management Console for Lake Formation, select Administrative roles and tasks from the navigation pane (Figure 5) and make sure the IAM role for AWS CDK deployment that starts with cfn-xxxxxx-cdk-exec-role- is selected as an administrator in Data lake administrators. This IAM role needs permissions in Lake Formation to create resources, such as an AWS Glue database. Without these permissions, the AWS CDK stack deployment will fail.

Figure 5: Add cfn-xxxxxx-cdk-exec-role- as a Data Lake administrator

  1. Use the following command in the base folder to deploy the AWS CDK solution
    npm run cdk deploy --all

During deployment, enter y if you want to deploy the changes for some stacks when you see the prompt Do you wish to deploy these changes (y/n)?

  1. After the deployment is complete, sign in to your AWS account B (producer account) and navigate to the AWS CloudFormation console to verify that the infrastructure deployed. You should see a list of the deployed CloudFormation stacks as shown in Figure 6.

Figure 6: Deployed CloudFormation stacks

Test automatic data registration to Amazon DataZone

To test, we use the Online Retail Transactions dataset from Kaggle as a sample dataset to demonstrate the automatic data registration.

  1. Download the Online Retail.csv file from Kaggle dataset.
  2. Login to AWS Account B (producer account) and navigate to the Amazon S3 console, find the DataZone-test-datasource S3 bucket, and upload the csv file there (Figure 7).

Figure 7: Upload the dataset CSV file

  1. The AWS Glue crawler is scheduled to run at a specific time each day. However for testing, you can manually run the crawler by going to the AWS Glue console and selecting Crawlers from the navigation pane. Run the on-demand crawler starting with DataZone-. After the crawler has run, verify that a new table has been created.
  2. Go to the Amazon DataZone console in AWS account A (central governance account) where you deployed the resources. Select Domains in the navigation pane (Figure 8), then Select and open your domain.

    Figure 8: Amazon DataZone domains

  3. After you open the Datazone Domain, you can find the Amazon Datazone data portal URL in the Summary section (Figure 9). Select and open data portal.

    Figure 9: Amazon DataZone data portal URL

  4. In the data portal find your project (Figure 10). Then select the Data tab at the top of the window.

    Figure 10: Amazon DataZone Project overview

  5. Select the section Data Sources (Figure 11) and find the newly created data source DataZone-testdata-db.

    Figure 11:  Select Data sources in the Amazon Datazone Domain Data portal

  6. Verify that the data source has been successfully published (Figure 12).

    Figure 12:  The data sources are visible in the Published data section

  7. After the data sources are published, users can discover the published data and can submit a subscription request. The data producer can approve or reject requests. Upon approval, users can consume the data by querying data in Amazon Athena. Figure 13 illustrates data discovery in the Amazon DataZone data portal.

    Figure 13: Example data discovery in the Amazon DataZone portal

Clean up

Use the following steps to clean up the resources deployed through the CDK.

  1. Empty the two S3 buckets that were created as part of this deployment.
  2. Go to the Amazon DataZone domain portal and delete the published data assets that were created in the Amazon DataZone project by the dataset-registration Lambda function.
  3. Delete the remaining resources created using the following command in the base folder:
    npm run cdk destroy --all

Conclusion

By using AWS Glue and Amazon DataZone, organizations can make their data management easier and allow teams to share and collaborate on data smoothly. Automatically sending AWS Glue data to Amazon DataZone not only makes the process simple but also keeps the data consistent, secure, and well-governed. Simplify and standardize publishing data assets to Amazon DataZone and streamline data management with Amazon DataZone. For guidance on establishing your organization’s data mesh with Amazon DataZone, contact your AWS team today.


About the Authors

Bandana Das is a Senior Data Architect at Amazon Web Services and specializes in data and analytics. She builds event-driven data architectures to support customers in data management and data-driven decision-making. She is also passionate about enabling customers on their data management journey to the cloud.

Anirban Saha is a DevOps Architect at AWS, specializing in architecting and implementation of solutions for customer challenges in the automotive domain. He is passionate about well-architected infrastructures, automation, data-driven solutions and helping make the customer’s cloud journey as seamless as possible. Personally, he likes to keep himself engaged with reading, painting, language learning and traveling.

Chandana Keswarkar is a Senior Solutions Architect at AWS, who specializes in guiding automotive customers through their digital transformation journeys by using cloud technology. She helps organizations develop and refine their platform and product architectures and make well-informed design decisions. In her free time, she enjoys traveling, reading, and practicing yoga.

Sindi Cali is a ProServe Associate Consultant with AWS Professional Services. She supports customers in building data driven applications in AWS.

How Zurich Insurance Group built a log management solution on AWS

Post Syndicated from Jake Obi original https://aws.amazon.com/blogs/big-data/how-zurich-insurance-group-built-a-log-management-solution-on-aws/

This post is written in collaboration with Clarisa Tavolieri, Austin Rappeport and Samantha Gignac from Zurich Insurance Group.

The growth in volume and number of logging sources has been increasing exponentially over the last few years, and will continue to increase in the coming years. As a result, customers across all industries are facing multiple challenges such as:

  • Balancing storage costs against meeting long-term log retention requirements
  • Bandwidth issues when moving logs between the cloud and on premises
  • Resource scaling and performance issues when trying to analyze massive amounts of log data
  • Keeping pace with the growing storage requirements, while also being able to provide insights from the data
  • Aligning license costs for Security Information and Event Management (SIEM) vendors with log processing, storage, and performance requirements. SIEM solutions help you implement real-time reporting by monitoring your environment for security threats and alerting on threats once detected.

Zurich Insurance Group (Zurich) is a leading multi-line insurer providing property, casualty, and life insurance solutions globally. In 2022, Zurich began a multi-year program to accelerate their digital transformation and innovation through the migration of 1,000 applications to AWS, including core insurance and SAP workloads.

The Zurich Cyber Fusion Center management team faced similar challenges, such as balancing licensing costs to ingest and long-term retention requirements for both business application log and security log data within the existing SIEM architecture. Zurich wanted to identify a log management solution to work in conjunction with their existing SIEM solution. The new approach would need to offer the flexibility to integrate new technologies such as machine learning (ML), scalability to handle long-term retention at forecasted growth levels, and provide options for cost optimization. In this post, we discuss how Zurich built a hybrid architecture on AWS incorporating AWS services to satisfy their requirements.

Solution overview

Zurich and AWS Professional Services collaborated to build an architecture that addressed decoupling long-term storage of logs, distributing analytics and alerting capabilities, and optimizing storage costs for log data. The solution was based on categorizing and prioritizing log data into priority levels between 1–3, and routing logs to different destinations based on priority. The following diagram illustrates the solution architecture.

Flow of logs from source to destination. All logs are sent to Cribl which routes portions of logs to the SIEM, portions to Amazon OpenSearch, and copies of logs to Amazon S3.

The workflow steps are as follows:

  1. All of the logs (P1, P2, and P3) are collected and ingested into an extract, transform, and load (ETL) service, AWS Partner Cribl’s Stream product, in real time. Capturing and streaming of logs is configured per use case based on the capabilities of the source, such as using built-in forwarders, installing agents, using Cribl Streams, and using AWS services like Amazon Data Firehose. This ETL service performs two functions before data reaches the analytics layer:
    1. Data normalization and aggregation – The raw log data is normalized and aggregated in the required format to perform analytics. The process consists of normalizing log field names, standardizing on JSON, removing unused or duplicate fields, and compressing to reduce storage requirements.
    2. Routing mechanism – Upon completing data normalization, the ETL service will apply necessary routing mechanisms to ingest log data to respective downstream systems based on category and priority.
  2. Priority 1 logs, such as network detection & response (NDR), endpoint detection and response (EDR), and cloud threat detection services (for example, Amazon GuardDuty), are ingested directly to the existing on-premises SIEM solution for real-time analytics and alerting.
  3. Priority 2 logs, such as operating system security logs, firewall, identity provider (IdP), email metadata, and AWS CloudTrail, are ingested into Amazon OpenSearch Service to enable the following capabilities. Previously, P2 logs were ingested into the SIEM.
    1. Systematically detect potential threats and react to a system’s state through alerting, and integrating those alerts back into Zurich’s SIEM for larger correlation, reducing by approximately 85% the amount of data ingestion into Zurich’s SIEM. Eventually, Zurich plans to use ML plugins such as anomaly detection to enhance analysis.
    2. Develop log and trace analytics solutions with interactive queries and visualize results with high adaptability and speed.
    3. Reduce the average time to ingest and average time to search that accommodates the increasing scale of log data.
    4. In the future, Zurich plans to use OpenSearch’s security analytics plugin, which can help security teams quickly detect potential security threats by using over 2,200 pre-built, publicly available Sigma security rules or create custom rules.
  4. Priority 3 logs, such as logs from enterprise applications and vulnerability scanning tools, are not ingested into the SIEM or OpenSearch Service, but are forwarded to Amazon Simple Storage Service (Amazon S3) for storage. These can be queried as needed using one-time queries.
  5. Copies of all log data (P1, P2, P3) are sent in real time to Amazon S3 for highly durable, long-term storage to satisfy the following:
    1. Long-term data retentionS3 Object Lock is used to enforce data retention per Zurich’s compliance and regulatory requirements.
    2. Cost-optimized storageLifecycle policies automatically transition data with less frequent access patterns to lower-cost Amazon S3 storage classes. Zurich also uses lifecycle policies to automatically expire objects after a predefined period. Lifecycle policies provide a mechanism to balance the cost of storing data and meeting retention requirements.
    3. Historic data analysis – Data stored in Amazon S3 can be queried to satisfy one-time audit or analysis tasks. Eventually, this data could be used to train ML models to support better anomaly detection. Zurich has done testing with Amazon SageMaker and has plans to add this capability in the near future.
  6. One-time query analysis – Simple audit use cases require historical data to be queried based on different time intervals, which can be performed using Amazon Athena and AWS Glue analytic services. By using Athena and AWS Glue, both serverless services, Zurich can perform simple queries without the heavy lifting of running and maintaining servers. Athena supports a variety of compression formats for reading and writing data. Therefore, Zurich is able to store compressed logs in Amazon S3 to achieve cost-optimized storage while still being able to perform one-time queries on the data.

As a future capability, supporting on-demand, complex query, analysis, and reporting on large historical datasets could be performed using Amazon OpenSearch Serverless. Also, OpenSearch Service supports zero-ETL integration with Amazon S3, where users can query their data stored in Amazon S3 using OpenSearch Service query capabilities.

The solution outlined in this post provides Zurich an architecture that supports scalability, resilience, cost optimization, and flexibility. We discuss these key benefits in the following sections.

Scalability

Given the volume of data currently being ingested, Zurich needed a solution that could satisfy existing requirements and provide room for growth. In this section, we discuss how Amazon S3 and OpenSearch Service help Zurich achieve scalability.

Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. The total volume of data and number of objects you can store in Amazon S3 are virtually unlimited. Based on its unique architecture, Amazon S3 is designed to exceed 99.999999999% (11 nines) of data durability. Additionally, Amazon S3 stores data redundantly across a minimum of three Availability Zones (AZs) by default, providing built-in resilience against widespread disaster. For example, the S3 Standard storage class is designed for 99.99% availability. For more information, check out the Amazon S3 FAQs.

Zurich uses AWS Partner Cribl’s Stream solution to route copies of all log information to Amazon S3 for long-term storage and retention, enabling Zurich to decouple log storage from their SIEM solution, a common challenge facing SIEM solutions today.

OpenSearch Service is a managed service that makes it straightforward to run OpenSearch without having to manage the underlying infrastructure. Zurich’s current on-premises SIEM infrastructure is comprised of more than 100 servers, all of which have to be operated and maintained. Zurich hopes to reduce this infrastructure footprint by 75% by offloading priority 2 and 3 logs from their existing SIEM solution.

To support geographies with restrictions on cross-border data transfer and to meet availability requirements, AWS and Zurich worked together to define an Amazon OpenSearch Service configuration that would support 99.9% availability using multiple AZs in a single region.

OpenSearch Service supports cross-region and cross-cluster queries, which helps with distributing analysis and processing of logs without moving data, and provides the ability to aggregate information across clusters. Since Zurich plans to deploy multiple OpenSearch domains in different regions, they will use cross-cluster search functionality to query data seamlessly across different regional domains without moving data. Zurich also configured a connector for their existing SIEM to query OpenSearch, which further allows distributed processing from on premises, and enables aggregation of data across data sources. As a result, Zurich is able to distribute processing, decouple storage, and publish key information in the form of alerts and queries to their SIEM solution without having to ship log data.

In addition, many of Zurich’s business units have logging requirements that could also be satisfied using the same AWS services (OpenSearch Service, Amazon S3, AWS Glue, and Amazon Athena). As such, the AWS components of the architecture were templatized using Infrastructure as Code (IaC) for consistent, repeatable deployment. These components are already being used across Zurich’s business units.

Cost optimization

In thinking about optimizing costs, Zurich had to consider how they would continue to ingest 5 TB per day of security log information just for their centralized security logs. In addition, lines of businesses needed similar capabilities to meet requirements, which could include processing 500 GB per day.

With this solution, Zurich can control (by offloading P2 and P3 log sources) the portion of logs that are ingested into their primary SIEM solution. As a result, Zurich has a mechanism to manage licensing costs, as well as improve the efficiency of queries by reducing the amount of information the SIEM needs to parse on search.

Because copies of all log data are going to Amazon S3, Zurich is able to take advantage of the different Amazon S3 storage tiers, such as using S3 Intelligent-Tiering to automatically move data among Infrequent Access and Archive Access tiers, to optimize the cost of retaining multiple years’ worth of log data. When data is moved to the Infrequent Access tier, costs are reduced by up to 40%. Similarly, when data is moved to the Archive Instant Access tier, storage costs are reduced by up to 68%.

Refer to Amazon S3 pricing for current pricing, as well as for information by region. Moving data to S3 Infrequent Access and Archive Access tiers provides a significant cost savings opportunity while meeting long-term retention requirements.

The team at Zurich analyzed priority 2 log sources, and based on historical analytics and query patterns, determined that only the most recent 7 days of logs are typically required. Therefore, OpenSearch Service was right-sized for retaining 7 days of logs in a hot tier. Rather than configuring UltraWarm and cold storage tiers for OpenSearch Service, copies of the remaining logs were simultaneously being sent to Amazon S3 for long-term retention and could be queried using Athena.

The combination of cost-optimization options is projected to reduce by 53% the cost of per GB of log data ingested and stored for 13 months when compared to the previous approach.

Flexibility

Another key consideration for the architecture was the flexibility to integrate with existing alerting systems and data pipelines, as well as the ability to incorporate new technology into Zurich’s log management approach. For example, Zurich also configured a connector for their existing SIEM to query OpenSearch, which further allows distributed processing from on premises and enables aggregation of data across data sources.

Within the OpenSearch Service software, there are options to expand log analysis using security analytics with predefined indicators of compromise across common log types. OpenSearch Service also offers the capability to integrate with ML capabilities such as anomaly detection and alert correlation to enhance log analysis.

With the introduction of Amazon Security Lake, there is another opportunity to expand the solution to more efficiently manage AWS logging sources and add to this architecture. For example, you can use Amazon OpenSearch Ingestion to generate security insights on security data from Amazon Security Lake.

Summary

In this post, we reviewed how Zurich was able to build a log data management architecture that provided the scalability, flexibility, performance, and cost-optimization mechanisms needed to meet their requirements.

To learn more about components of this solution, visit the Centralized Logging with OpenSearch implementation guide, review Querying AWS service logs, or run through the SIEM on Amazon OpenSearch Service workshop.


About the Authors

Clarisa Tavolieri is a Software Engineering graduate with qualifications in Business, Audit, and Strategy Consulting. With an extensive career in the financial and tech industries, she specializes in data management and has been involved in initiatives ranging from reporting to data architecture. She currently serves as the Global Head of Cyber Data Management at Zurich Group. In her role, she leads the data strategy to support the protection of company assets and implements advanced analytics to enhance and monitor cybersecurity tools.

Austin RappeportAustin Rappeport is a Computer Engineer who graduated from the University of Illinois Urbana/Champaign in 2011 with a focus in Computer Security. After graduation, he worked for the Federal Energy Regulatory Commission in the Office of Electric Reliability, working with the North American Electric Reliability Corporation’s Critical Infrastructure Protection Standards on both the audit and enforcement side, as well as standards development. Austin currently works for Zurich Insurance as the Global Head of Detection Engineering and Automation, where he leads the team responsible for using Zurich’s security tools to detect suspicious and malicious activity and improve internal processes through automation.

Samantha Gignac is a Global Security Architect at Zurich Insurance. She graduated from Ferris State University in 2014 with a Bachelor’s degree in Computer Systems & Network Engineering. With experience in the insurance, healthcare, and supply chain industries, she has held roles such as Storage Engineer, Risk Management Engineer, Vulnerability Management Engineer, and SOC Engineer. As a Cybersecurity Architect, she designs and implements secure network systems to protect organizational data and infrastructure from cyber threats.

Claire Sheridan is a Principal Solutions Architect with Amazon Web Services working with global financial services customers. She holds a PhD in Informatics and has more than 15 years of industry experience in tech. She loves traveling and visiting art galleries.

Jake Obi is a Principal Security Consultant with Amazon Web Services based in South Carolina, US, with over 20 years’ experience in information technology. He helps financial services customers improve their security posture in the cloud. Prior to joining Amazon, Jake was an Information Assurance Manager for the US Navy, where he worked on a large satellite communications program as well as hosting government websites using the public cloud.

Srikanth Daggumalli is an Analytics Specialist Solutions Architect in AWS. Out of 18 years of experience, he has over a decade of experience in architecting cost-effective, performant, and secure enterprise applications that improve customer reachability and experience, using big data, AI/ML, cloud, and security technologies. He has built high-performing data platforms for major financial institutions, enabling improved customer reach and exceptional experiences. He is specialized in services like cross-border transactions and architecting robust analytics platforms.

Freddy Kasprzykowski is a Senior Security Consultant with Amazon Web Services based in Florida, US, with over 20 years’ experience in information technology. He helps customers adopt AWS services securely according to industry best practices, standards, and compliance regulations. He is a member of the Customer Incident Response Team (CIRT), helping customers during security events, a seasoned speaker at AWS re:Invent and AWS re:Inforce conferences, and a contributor to open source projects related to AWS security.

Author data integration jobs with an interactive data preparation experience with AWS Glue visual ETL

Post Syndicated from Chiho Sugimoto original https://aws.amazon.com/blogs/big-data/author-data-integration-jobs-with-an-interactive-data-preparation-experience-with-aws-glue-visual-etl/

We are excited to announce a new capability of the AWS Glue Studio visual editor that offers a new visual user experience. Now you can author data preparation transformations and edit them with the AWS Glue Studio visual editor. The AWS Glue Studio visual editor is a graphical interface that enables you to create, run, and monitor data integration jobs in AWS Glue.

The new data preparation interface in AWS Glue Studio provides an intuitive, spreadsheet-style view for interactively working with tabular data. Within this interface, you can visually inspect tabular data samples, validate recipe steps through real-time runs, and author data preparation recipes without writing code. Within the new experience, you can choose from hundreds of prebuilt transformations. This allows data analysts and data scientists to rapidly construct the necessary data preparation steps to meet their business needs. After you complete authoring the recipes, AWS Glue Studio will automatically generate the Python script to run the recipe data transformations as part of AWS Glue extract, transform, and load (ETL) jobs.

In this post, we show how to use this new feature to build a visual ETL job that preprocesses data to meet the business needs for an example use case, entirely within the AWS Glue Studio console, without the overhead of manual script coding.

Example use case

A fictional e-commerce company sells apparel and allows customers to leave text reviews and star ratings for each product, to help other customers to make informed purchase decisions. To simulate this, we will use a sample synthetic review dataset, which includes different products and customer reviews.

In this scenario, you’re a data analyst in this company. Your role involves preprocessing raw customer review data to prepare it for downstream analytics. This requires transforming the data by normalizing columns through actions such as casting columns to appropriate data types, splitting a single column into multiple new columns, and adding computed columns based on other columns. To quickly create an ETL job for these business requirements, you use AWS Glue Studio to inspect the data and author data preparation recipes.

The AWS Glue job will be configured to output the file to Amazon Simple Storage Service (Amazon S3) in a preferred format and automatically create a table in the AWS Glue Data Catalog. This Data Catalog table will be shared with your analyst team, allowing them to query the table using Amazon Athena.

Prerequisites

For this tutorial, you need an S3 bucket to store output from the AWS Glue ETL job and Athena queries, and a Data Catalog database to create new tables. You also need to create AWS Identity and Access Management (IAM) roles for the AWS Glue job and AWS Management Console user.

Create an S3 bucket to store output from the AWS Glue ETL jobs and Athena query results

You can either create a new S3 bucket or use an existing bucket to store output from the AWS Glue ETL job and Athena queries. In the following steps, replace <glue-etl-output-s3-bucket> and <athena-query-output-s3-bucket> with the name of the S3 bucket.

Create a Data Catalog database

You can either create a new Data Catalog database or use an existing database to create tables. In the following steps, replace <your_database> with the name of your database.

Create an IAM role for the AWS Glue job

Complete the following steps to create an IAM role for the AWS Glue job:

  1. On the IAM console, in the navigation pane, choose Role.
  2. Choose Create role.
  3. For Trusted entity type, choose AWS service.
  4. For Service or use case, choose Glue.
  5. Choose Next.
  6. For Add permissions, choose AWSGlueServiceRole, then choose Next.
  7. For Role name, enter a role name (for this post, GlueJobRole-recipe-demo).
  8. Choose Create role.
  9. Choose the created IAM role.
  10. Under Permissions policies, choose Add permission and Create inline policy.
  11. For Policy editor, choose JSON, and enter the following policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::aws-bigdata-blog/generated_synthetic_reviews/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:List*",
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>/*",
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>"
                ]
            }
        ]
    }

  12. Choose Next.
  13. For Policy name, enter a name for your policy.
  14. Choose Create policy.

Create an IAM role for the console user

Complete the following steps to create the IAM role to interact with the console:

  1. On the IAM console, in the navigation pane, choose Role.
  2. Choose Create role.
  3. For Trusted entity type, choose the entity of your choice.
  4. For Add permissions, add the following AWS managed policies:
    1. AmazonAthenaFullAccess
    2. AWSGlueConsoleFullAccess
  5. Choose Next.
  6. For Role name, enter a role name of your choice.
  7. Choose Create role.
  8. Choose the created IAM role.
  9. Under Permissions policies, choose Add permission and Create inline policy.
  10. For Policy editor, choose JSON, and enter the following policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole"
                ],
                "Resource": [
                    "arn:aws:iam::<account-id>:role/GlueJobRole-recipe-demo"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject"
                ],
                "Resource": [
                    "arn:aws:s3:::aws-bigdata-blog/generated_synthetic_reviews/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:List*",
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>/*",
                    "arn:aws:s3:::<athena-query-output-s3-bucket>/*"
                ]
            }
        ]
    }

  11. Choose Next.
  12. For Policy name, enter a name for your policy.
  13. Choose Create policy.

The S3 bucket and IAM roles required for this tutorial have been created and configured. Switch to the console user role that you set up and proceed with the steps in the following sections.

Author and run a data integration job using the interactive data preparation experience

Let’s create an AWS Glue ETL job in AWS Glue Studio. In this ETL job, we load S3 Parquet files as the source, process the data using recipe steps, and write the output to Amazon S3 as Parquet. You can configure all these steps in the visual editor in AWS Glue Studio. We use the new data preparation authoring capabilities to create recipes that meet our specific business needs for data transformations. This exercise will demonstrate how you can develop data preparation recipes in AWS Glue Studio that are tailored to your use case and can be readily incorporated into scalable ETL jobs. Complete the following steps:

  1. On the AWS Glue Studio console, choose Visual ETL in the navigation pane.
  2. Under Create job, choose Visual ETL.
  3. At the top of the job, replace “Untitled job” with a name of your choice.
  4. On the Job Details tab, under Basic properties, specify the IAM role that the job will use (GlueJobRole-recipe-demo).
  5. Choose Save.
  6. On the Visual tab, choose the plus sign to open the Add nodes menu. Search for s3 and add an Amazon S3 as a Source.
  1. For S3 source type, choose S3 location.
  2. For S3 URL, specify s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/.
  3. For Data format, select Parquet.
  4. As a child of this source, search in the Add nodes menu for recipe and add the Data Preparation Recipe
  5. In the Data preview window, choose Start session if it has not been started.
    1. If it hasn’t been started, Start a data preview session will be displayed on the Data Preparation Recipe
    2. Choose your IAM role for the AWS Glue job.
    3. Choose Start session.
  1. After your data preview session has been started, on the Data Preparation Recipe transform, choose Author Recipe to open the data preparation recipe editor.

This will initialize a session using a subset of the data. After session initialization, the AWS Glue Studio console provides an interactive interface that enables intuitive construction of recipe steps for AWS Glue ETL jobs.

As described in our example use case, you’re authoring recipes to preprocess customer review data for analysis. Upon reviewing the spreadsheet-style data preview, you notice the product_title column contains values like business formal pants, plain and business formal jeans, patterned, with the product name and sub-attribute separated by a comma. To better structure this data for downstream analysis, you decide to split the product_title column on the comma delimiter to create separate columns for the product name and sub-attribute. This will allow for easier filtering and aggregation by product type or attribute during analysis.

On the spreadsheet-style UI, you can check the statistics of each column like Min, Median, Max, cardinality, and value distribution for a subset of the data. This provides useful insights about the data to inform transformation decisions. When reviewing the statistics for the review_year columns, you notice they contain a wide range of values spanning over 15 years. To enable easier analysis of seasonal and weekly trends, you decide to derive new columns showing the week number and day of the week computed from the review_date column.

Moreover, for convenience of downstream analysis, you decided to change the data type of the customer_id and product_id columns from string to integer. Converting data types is a common task in ETL workflows for analytics. The data preparation recipes in AWS Glue Studio provide a wide variety of common ETL transformations like renaming columns, deleting columns, sorting, and reordering columns. Feel free to browse the data preparation UI to discover other available recipes that can help transform your data.

Let’s see how to implement the recipe step in the Data Preparation Recipe transform to meet these requirements.

  1. Select the customer_id column and choose the Change type recipe step.
    1. For Change type to, choose integer.
    2. Choose Apply to add the recipe step.
  1. Select the product_id column and choose the Change type recipe step.
    1. For Change type to, choose integer.
    2. Choose Apply.
  2. Select the product_title column and choose On a single delimiter under SPLIT.
    1. For Delimiter, select Enter custom value and enter ,.
    2. Choose Apply.
  1. Select the review_date column and choose Week number under EXTRACT.
    1. For Destination column, enter review_date_week_number.
    2. Choose Apply.
  1. Select the review_date column and choose Day of week under EXTRACT.
    1. For Destination column, enter review_date_week_day.
    2. Choose Apply.

After these recipe steps were applied, you can see the customer_id and product_id columns have been converted to integer, the product_title column has been split into product_title1 and product_title2, and review_date_week_number and review_date_week_day have been added. While authoring data preparation recipe steps, you can view tabular data and inspect whether the recipe steps are working as expected. This enables interactive validation of recipe steps through the subset examination results previewed in the UI during the recipe authoring process.

  1. Choose Done authoring recipe to close the interface.

Now, on the Script tab in AWS Glue Studio console, you can see the script generated from the recipe steps. AWS Glue Studio automatically converts the recipe steps configured through the UI into the Python code. This allows you to build ETL jobs utilizing the wide range of transformations available in data preparation recipes, without having to manually code the logic yourself.

  1. Choose Save to save the job.
  2. On the Visual tab, search in the Add nodes menu for s3 and add an Amazon S3 as a Target.
    1. For Format, choose Parquet.
    2. For Compression Type, choose Snappy.
    3. For S3 Target Location, select your output S3 location s3://<glue-etl-output-s3-bucket>/output/.
    4. For Data Catalog update options, choose Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
    5. For Database, choose the database of your choice.
    6. For Table name, enter data_preparation_recipe_demo_tbl.
    7. Under Partition keys, choose Add a partition key, and select review_year.
  3. Choose Save, then choose Run to run the job.

Up to this point, we have created and run the ETL job. When the job has finished running, a table named data_preparation_recipe_demo_tbl has been created in the Data Catalog. The table has the partition column review_year with partitions for the years 2000–2016. Let’s move on to the next step and query the table.

Run queries on the output data with Athena

Now that the AWS Glue ETL job is complete, let’s query the transformed output data. As a sample analysis, let’s find the top three items that were reviewed in 2008 across all marketplaces and calculate the average star rating for those items. Then, for the top one item that was reviewed in 2008, we find the top five sub-attributes for it. This will demonstrate querying the new processed dataset to derive insights.

  1. On the Athena console, run the following query against the table:
    SELECT count(*) AS count, product_title_1, avg(star_rating) AS ave 
    FROM <your_database>.data_preparation_recipe_demo_tbl 
    WHERE review_year = 2008
    GROUP BY product_title_1
    ORDER BY count DESC
    LIMIT 3;

This query counts the number of reviews in 2008 for each product_title_1 and returns the top three most reviewed items. It also calculates the average star_rating for each of the top three items. The query will return results as shown in the following screenshot.

The item made with natural materials heels is the top one most reviewed item. Now let’s query the top five most reviewed attributes for it.

  1. Run the following query against the table:
    SELECT count(*) AS count, product_title_2, avg(star_rating) AS ave 
    FROM <your_database>.data_preparation_recipe_demo_tbl
    WHERE review_year = 2008 
    AND product_title_1 = 'made with natural materials heels'
    GROUP BY product_title_2
    ORDER BY count DESC
    LIMIT 5;

The query will return results as shown in the following screenshot.

The query results show that for the top reviewed item made with natural materials heels, the top five most reviewed sub-attributes in 2008 were draped, asymmetric, muted, polka-dotted, and oversized. Of these top five sub-attributes, draped had the highest average star rating.

Through this walkthrough, we were able to quickly build an ETL job and generate datasets that fulfill analytics needs, without the overhead of manual script coding.

Clean up

If you no longer need this solution, you can delete the following resources created in this tutorial:

  • S3 bucket (s3://<glue-etl-output-s3-bucket>, s3://<athena-query-output-s3-bucket>)
  • IAM roles for the AWS Glue job (GlueJobRole-recipe-demo) and the console user
  • AWS Glue ETL job
  • Data Catalog database (<your_database>) and table (data_preparation_recipe_demo_tbl)

Conclusion

In this post, we introduced the new AWS Glue data preparation authoring experience, which lets you create new low-code no-code data integration recipe transformations directly on the AWS Glue Studio console. We demonstrated how you can use this feature to quickly build ETL jobs and generate datasets that meet your business needs without time-consuming manual coding.

The AWS Glue data preparation authoring experience is now publicly available. Try out this new capability and discover recipes that can facilitate your data transformations.

To learn more about using the interactive data preparation authoring experience in AWS Glue Studio, check out the following video and read the AWS News Blog.


About the Authors

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Fabrizio Napolitano is a Principal Specialist Solutions Architect or Data Analytics at AWS. He has worked in the analytics domain for the last 20 years, now focusing on helping Canadian public sector organizations innovate with data. Quite by surprise, he become a Hockey Dad after moving to Canada.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Gal HeyneGal Heyne is a Technical Product Manager for AWS Data Processing services with a strong focus on AI/ML, data engineering, and BI. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data services products.

Accelerate query performance with Apache Iceberg statistics on the AWS Glue Data Catalog

Post Syndicated from Sotaro Hikita original https://aws.amazon.com/blogs/big-data/accelerate-query-performance-with-apache-iceberg-statistics-on-the-aws-glue-data-catalog/

Today, we are pleased to announce a new capability for the AWS Glue Data Catalog: generating column-level aggregation statistics for Apache Iceberg tables to accelerate queries. These statistics are utilized by cost-based optimizer (CBO) in Amazon Redshift Spectrum, resulting in improved query performance and potential cost savings.

Apache Iceberg is an open table format that provides the capability of ACID transactions on your data lakes. It’s designed to process large analytics datasets and is efficient for even small row-level operations. It also enables useful features such as time-travel, schema evolution, hidden partitioning, and more.

AWS has invested in service integration with Iceberg to enable Iceberg workloads based on customer feedback. One example is the AWS Glue Data Catalog. The Data Catalog is a centralized repository that stores metadata about your organization’s datasets, making the data visible, searchable, and queryable for users. The Data Catalog supports Iceberg tables and tracks the table’s current metadata. It also allows automatic compaction of individual small files produced by each transactional write on tables into a few large files for faster read and scan operations.

In 2023, the Data Catalog announced support for column-level statistics for non-Iceberg tables. That feature collects table statistics used by the query engine’s CBO. Now, the Data Catalog expands this support to Iceberg tables. The Iceberg table’s column statistics that the Data Catalog generates are based on Puffin Spec and stored on Amazon Simple Storage Service (Amazon S3) with other table data. This way, various engines supporting Iceberg can utilize and update them.

This post demonstrates how column-level statistics for Iceberg tables work with Redshift Spectrum. Furthermore, we showcase the performance benefit of the Iceberg column statistics with the TPC-DS dataset.

How Iceberg table’s column statistics works

AWS Glue Data Catalog generates table column statistics using the Theta Sketch algorithm on Apache DataSketches to estimate the number of distinct values (NDV) and stores them in Puffin file.

For SQL planners, NDV is an important statistic to optimize query planning. There are a few scenarios where NDV statistics can potentially optimize query performance. For example, when joining two tables on a column, the optimizer can use the NDV to estimate the selectivity of the join. If one table has a low NDV for the join column compared to the other table, the optimizer may choose to use a broadcast join instead of a shuffle join, reducing data movement and improving query performance. Moreover, when there are more than two tables to be joined, the optimizer can estimate the output size of each join and plan the efficient join order. Furthermore, NDV can be used for various optimizations such as group by, distinct, and count query.

However, calculating NDV continuously with 100% accuracy requires O(N) space complexity. Instead, Theta Sketch is an efficient algorithm that allows you to estimate the NDV in a dataset without needing to store all the distinct values on memory and storage. The key idea behind Theta Sketch is to hash the data into a range between 0–1, and then select only a small portion of the hashed values based on a threshold (denoted as θ). By analyzing this small subset of data, the Theta Sketch algorithm can provide an accurate estimate of the NDV in the original dataset.

Iceberg’s Puffin file is designed to store information such as indexes and statistics as a blob type. One of the representative blob types that can be stored is apache-datasketches-theta-v1, which is serialized values for estimating the NDV using the Theta Sketch algorithm. Puffin files are linked to a snapshot-id on Iceberg’s metadata and are utilized by the query engine’s CBO to optimize query plans.

Leverage Iceberg column statistics through Amazon Redshift

To demonstrate the performance benefit of this capability, we employ the industry-standard TPC-DS 3 TB dataset. We compare the query performance with and without Iceberg column statistics for the tables by running queries in Redshift Spectrum. We have included the queries used in this post, and we recommend trying your own queries by following the workflow.

The following is the overall steps:

  1. Run AWS Glue Job that extracts TPS-DS dataset from Public Amazon S3 bucket and saves them as an Iceberg table in your S3 bucket. AWS Glue Data Catalog stores those tables’ metadata location. Query these tables using Amazon Redshift Spectrum.
  2. Generate column statistics: Employ the enhanced capabilities of AWS Glue Data Catalog to generate column statistics for each tables. It generates puffin files storing Theta Sketch.
  3. Query with Amazon Redshift Spectrum: Evaluate the performance benefit of column statistics on query performance by utilizing Amazon Redshift Spectrum to run queries on the dataset.

The following diagram illustrates the architecture.

To try this new capability, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Run an AWS Glue job to create Iceberg tables for the 3TB TPC-DS dataset in your S3 bucket. The Data Catalog stores those tables’ metadata location.
  3. Run queries on Redshift Spectrum and note the query duration.
  4. Generate Iceberg column statistics for Data Catalog tables.
  5. Run queries on Redshift Spectrum and compare the query duration with the previous run.
  6. Optionally, schedule AWS Glue column statistics jobs using AWS Lambda and an Amazon EventBridge

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs. Note that this CloudFormation template requires a region with at least 3 Availability Zones. The template generates the following resources:

  • A virtual private cloud (VPC), public subnet, private subnets, and route tables
  • An Amazon Redshift Serverless workgroup and namespace
  • An S3 bucket to store the TPC-DS dataset, column statistics, job scripts, and so on
  • Data Catalog databases
  • An AWS Glue job to extract the TPS-DS dataset from the public S3 bucket and save the data as an Iceberg table in your S3 bucket
  • AWS Identity and Access Management (AWS IAM) roles and policies
  • A Lambda function and EventBridge schedule to run the AWS Glue column statistics on a schedule

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack.
  3. Choose Next.
  4. Leave the parameters as default or make appropriate changes based on your requirements, then choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Run an AWS Glue job to create Iceberg tables for the 3TB TPC-DS dataset

When the CloudFormation stack creation is complete, run the AWS Glue job to create Iceberg tables for the TPC-DS dataset. This AWS Glue job extracts the TPC-DS dataset from the public S3 bucket and transforms the data into Iceberg tables. Those tables are loaded into your S3 bucket and registered to the Data Catalog.

To run the AWS Glue job, complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose InitialDataLoadJob-<your-stack-name>.
  3. Choose Run.

This AWS Glue job can take around 30 minutes to complete. The process is complete when the job processing status shows as Succeeded.

The AWS Glue job creates tables storing the TPC-DS dataset in two identical databases: tpcdsdbnostats and tpcdsdbwithstats. The tables in tpcdsdbnostats will have no generated statistics, and we use them as reference. We generate statistics on tables in tpcdsdbwithstats. Confirm the creation of those two databases and underlying tables on the AWS Glue console. At this time, those databases hold the same data and there are no statistics generated on the tables.

Run queries on Redshift Spectrum without statistics

In the previous steps, you set up a Redshift Serverless workgroup with the given RPU (128 by default), prepared the TPC-DS 3TB dataset in your S3 bucket, and created Iceberg tables (which currently don’t have statistics).

To run your query in Amazon Redshift, complete the following steps:

  1. Download the Amazon Redshift queries.
  2. In the Redshift query editor v2, run the queries listed in the Redshift Query for tables without column statistics section in the downloaded file redshift-tpcds-sample.sql.
  3. Note the query runtime of each query.

Generate Iceberg column statistics

To generate statistics on the Data Catalog tables, complete the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose the tpcdsdbwithstats database to view all available tables.
  3. Select any of these tables (for example, call_center).
  4. Go to Column statistics – new and choose Generate statistics.
  5. Keep the default options:
    1. For Choose columns, select Table (All columns).
    2. For Row sampling options, select All rows.
    3. For IAM role, choose AWSGluestats-blog-<your-stack-name>.
  6. Choose Generate statistics.

You’ll be able to see status of the statistics generation run as shown in the following screenshot.

After you generate the Iceberg table column statistics, you should be able to see detailed column statistics for that table.

Following the statistics generation, you will find an <id>.stat file in the AWS Glue table’s underlying data location in Amazon S3. This file is a Puffin file that stores the Theta Sketch data structure. Query engines can use this Theta Sketch algorithm to efficiently estimate the NDV when operating on the table, which helps optimize query performance.

Reiterate the previous steps to generate statistics for all tables, such as catalog_sales, catalog_returns, warehouse, item, date_dim, store_sales, customer, customer_address, web_sales, time_dim, ship_mode, web_site, and web_returns. Alternatively, you can manually run the Lambda function that instructs AWS Glue to generate column statistics for all tables. We discuss the details of this function later in this post.

After you generate statistics for all tables, you can assess the query performance for each query.

Run queries on Redshift Spectrum with statistics

In the previous steps, you set up a Redshift Serverless workgroup with the given RPU (128 by default), prepared the TPC-DS 3TB dataset in your S3 bucket, and created Iceberg tables with column statistics.

To run the provided query using Redshift Spectrum on the statistics tables, complete the following steps:

  1. In the Redshift query editor v2, run the queries listed in Redshift Query for tables with column statistics section in the downloaded file redshift-tpcds-sample.sql.
  2. Note the query runtime of each query.

With Redshift Serverless 128 RPU and the TPC-DS 3TB dataset, we conducted sample runs for 10 selected TPC-DS queries where NDV information was expected to be beneficial. We ran each query 10 times. The results shown in the following table are sorted by the percentage of the performance improvement for the queries with column statistics.

TPC-DS 3T Queries Without Column Statistics With Column Statistics Performance Improvement (%)
Query 16 305.0284 51.7807 489.1
Query 75 398.0643 110.8366 259.1
Query 78 169.8358 52.8951 221.1
Query 95 35.2996 11.1047 217.9
Query 94 160.52 57.0321 181.5
Query 68 14.6517 7.4745 96
Query 4 217.8954 121.996 78.6
Query 72 123.8698 76.215 62.5
Query 29 22.0769 14.8697 48.5
Query 25 43.2164 32.8602 31.5

The results demonstrated clear performance benefits ranging from 31.5–489.1%.

To dive deep, let’s explore query 16, which showed the highest performance benefit:

TPC-DS Query 16:

select
   count(distinct cs_order_number) as "order count"
  ,sum(cs_ext_ship_cost) as "total shipping cost"
  ,sum(cs_net_profit) as "total net profit"
from
   "awsdatacatalog"."tpcdsdbwithstats"."catalog_sales" cs1
  ,"awsdatacatalog"."tpcdsdbwithstats"."date_dim"
  ,"awsdatacatalog"."tpcdsdbwithstats"."customer_address"
  ,"awsdatacatalog"."tpcdsdbwithstats"."call_center"
where
    d_date between '2000-2-01' 
    and dateadd(day, 60, cast('2000-2-01' as date))
    and cs1.cs_ship_date_sk = d_date_sk
    and cs1.cs_ship_addr_sk = ca_address_sk
    and ca_state = 'AL'
    and cs1.cs_call_center_sk = cc_call_center_sk
    and cc_county in ('Dauphin County','Levy County','Luce County','Jackson County',
                    'Daviess County')
and exists (select *
            from "awsdatacatalog"."tpcdsdbwithstats"."catalog_sales" cs2
            where cs1.cs_order_number = cs2.cs_order_number
            and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
and not exists(select *
               from "awsdatacatalog"."tpcdsdbwithstats"."catalog_returns" cr1
               where cs1.cs_order_number = cr1.cr_order_number)
order by count(distinct cs_order_number)
limit 100;

You can compare the difference between the query plans with and without column statistics with the ANALYZE query.

The following screenshot shows the results without column statistics.

The following screenshot shows the results with column statistics.

You can observe some notable differences as a result of using column statistics. At a high level, the overall estimated cost of the query is significantly reduced, from 20633217995813352.00 to 331727324110.36.

The two query plans chose different join strategies.

The following is one line included in the query plan without column statistics:

XN Hash Join DS_DIST_BOTH (cost45365031.50 rows=10764790749 width=44)
" Outer Dist Key: ""outer"".cs_order_number"
Inner Dist Key: volt_tt_61c54ae740984.cs_order_number
" Hash Cond: ((""outer"".cs_order_number = ""inner"".cs_order_number) AND (""outer"".cs_warehouse_sk = ""inner"".cs_warehouse_sk))"

The following is the corresponding line in the query plan with column statistics:

XN Hash Join DS_BCAST_INNER (cost=307193250965.64..327130154786.68 rows=17509398 width=32)
" Hash Cond: ((""outer"".cs_order_number = ""inner"".cs_order_number) AND (""outer"".cs_warehouse_sk = ""inner"".cs_warehouse_sk))"

The query plan for the table without column statistics used DS_DIST_BOTH when joining large tables, whereas the query plan for the table with column statistics chose DS_BCAST_INNER. The join order has also changed based on the column statistics. Those join strategy and join order changes are mainly driven by more accurate join cardinality estimations, which are possible with column statistics, and result in a more optimized query plan.

Schedule AWS Glue column statistics Runs

Maintaining up-to-date column statistics is crucial for optimal query performance. This section guides you through automating the process of generating Iceberg table column statistics using Lambda and EventBridge Scheduler. This automation keeps your column statistics up to date without manual intervention.

The required Lambda function and EventBridge schedule are already created through the CloudFormation template. The Lambda function is used to invoke the AWS Glue column statistics run. First, complete the following steps to explore how the Lambda function is configured:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Open the function GlueTableStatisticsFunctionv1.

For a clearer understanding of the Lambda function, we recommend reviewing the code in the Code section and examining the environment variables under Configuration.

As shown in the following code snippet, the Lambda function invokes the start_column_statistics_task_run API through the AWS SDK for Python (Boto3) library.

Next, complete the following steps to explore how the EventBridge schedule is configured:

  1. On the EventBridge console, choose Schedules under Scheduler in the navigation pane.
  2. Locate the schedule created by the CloudFormation console.

This page is where you manage and configure the schedules for your events. As shown in the following screenshot, the schedule is configured to invoke the Lambda function daily at a specific time—in this case, 08:27 PM UTC. This makes sure the AWS Glue column statistics runs on a regular and predictable basis.

Clean up

When you have finished all the above steps, remember to clean up all the AWS resources you created using AWS CloudFormation:

  1. Delete the CloudFormation stack.
  2. Delete S3 bucket storing the Iceberg table for the TPC-DS dataset and the AWS Glue job script.

Conclusion

This post introduced a new feature in the Data Catalog that enables you to create Iceberg table column-level statistics. The Iceberg table stores Theta Sketch, which can be used to estimate NDV efficiently in a Puffin file. The Redshift Spectrum CBO can use that to optimize the query plan, resulting in improved query performance and potential cost savings.

Try out this new feature in the Data Catalog to generate column-level statistics and improve query performance, and let us know your feedback in the comments section. Visit the AWS Glue Catalog documentation to learn more.


About the Authors

Sotaro Hikita is a Solutions Architect. He supports customers in a wide range of industries, especially the financial industry, to build better solutions. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Kyle Duong is a Senior Software Development Engineer on the AWS Glue and AWS Lake Formation team. He is passionate about building big data technologies and distributed systems.

Kalaiselvi Kamaraj is a Senior Software Development Engineer with Amazon. She has worked on several projects within the Amazon Redshift query processing team and currently focusing on performance-related projects for Redshift data lakes.

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Integrate your data and collaborate using data preparation in AWS Glue Studio

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/integrate-your-data-and-collaborate-using-data-preparation-in-aws-glue-studio/

Today, we announce the general availability of data preparation authoring in AWS Glue Studio Visual ETL. This is a new no-code data preparation user experience for business users and data analysts with a spreadsheet-style UI that runs data integration jobs at scale on AWS Glue for Spark. The new visual data preparation experience makes it easier for data analysts and data scientists to clean and transform data to prepare it for analytics and machine learning (ML). Within this new experience, you can choose from hundreds of pre-built transformations to automate data preparation tasks, all without the need to write any code.

Business analysts can now collaborate with data engineers to build data integration jobs. Data engineers can use the Glue Studio visual flow-based view to define connections to the data and set the ordering of the data flow process. Business analysts can use the data preparation experience to define the data transformation and output. Additionally, you can import your existing AWS Glue DataBrew data cleansing and preparation “recipes” to the new AWS Glue data preparation experience. This way, you can continue to author them directly in AWS Glue Studio and then scale up recipes to process petabytes of data at the lower price point for AWS Glue jobs.

Visual ETL prerequisites (environment setup)
The visual ETL needs an AWSGlueConsoleFullAccess IAM managed policy attached to the users and roles that will access AWS Glue.


This policy grants these users and roles full access to AWS Glue and read access to Amazon Simple Storage Service (Amazon S3) resources.

Advanced visual ETL flows
Once the appropriate AWS Identity and Access Management (IAM) role permissions have been defined, author the visual ETL using AWS Glue Studio.

Extract
Create an Amazon S3 node by selecting the Amazon S3 node from the list of Sources.


Select the newly created node and browse for an S3 dataset. Once the file has been uploaded successfully, choose Infer schema to configure the source node and the visual interface will show the preview of the data contained in the .csv file.

Earlier I created an S3 bucket in the same Region as the AWS Glue visual ETL and uploaded a .csv file visual ETL conference data.csv containing the data that I will be visualizing.

It’s important to set up the role permissions as detailed in the previous step to grant AWS Glue access to read the S3 bucket. Without performing this step, you’ll get an error that ultimately prevents you from seeing the data preview.

Transform
After the node has been configured, add a Data Preparation Recipe and start a data preview session. Starting this session typically takes about 2 – 3 minutes.


Once the data preview session is ready, choose Author Recipe to start an authoring session and add transformations once the data frame is complete. During the authoring session, you can view the data, apply transformation steps, and view the transformed data interactively. You can undo, redo, and reorder the steps. You can visualize the data type of the column and the statistical properties of each column.


You can start applying transformation steps to your data such as changing formats from lowercase to uppercase, changing the sort order, and more, by choosing Add step. All your data preparation steps will be tracked in the recipe.
I wanted a view of conferences that will be hosted in South Africa, so I created two recipes to filter by condition where the Location column has values equal to “South Africa”, and the Comments column contains a value.


Load
Once you’ve prepared your data interactively, you can share your work with data engineers who can extend it with more advanced visual ETL flows and custom code to seamlessly integrate it into their production data pipelines.

Now available
The AWS Glue data preparation authoring experience is now publicly available in all commercial AWS Regions where AWS Data Brew is available. To learn more, visit AWS Glue.

For more information, visit the AWS Glue Developer Guide and send feedback to AWS re:Post for AWS Glue or through your usual AWS support contacts.

— Veliswa

Introducing AWS Glue usage profiles for flexible cost control

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-usage-profiles-for-flexible-cost-control/

AWS Glue is a serverless data integration service that enables you to run extract, transform, and load (ETL) workloads on your data in a scalable and serverless manner. One of the main advantages of using a cloud platform is its flexibility; you can provision compute resources when you actually need them. However, with this ease of creating resources comes a risk of spiraling cloud costs when those resources are left unmanaged or without guardrails. As a result, admins need to balance avoiding high infrastructure costs with allowing users to work without unnecessary friction.

To address that, today we are excited to announce the general availability of AWS Glue usage profiles. With AWS Glue usage profiles, admins can create different profiles for various classes of users within the account, such as developers, testers, and product teams. Each profile is a unique set of parameters that can be assigned to different types of users. For example, developers may need more workers and can have a higher number of maximum workers, whereas product teams may need fewer workers and a lower timeout or idle timeout value.

How AWS Glue usage profiles works

An AWS Glue usage profile is a resource identified by an Amazon Resource Name (ARN) for better governance of resources. Admins have the ability to create AWS Glue usage profiles and define default values to be used when a parameter value is not provided. For example, you can create an AWS Glue usage profile with the default number of workers set to 2. When you sign in to the AWS Glue console using the AWS Identity and Access Management (IAM) user associated with the usage profile and create a new job, the initial value configured for the number of workers shows as 2 instead of the service default of 10.

Additionally, you can specify a set of allowed values for validation when a user associated with this profile creates a resource. If the parameter is numeric, admins can define a range of allowed values by specifying minimum and maximum values, instead of a specific set. For example, you can create an AWS Glue usage profile that allows only G.1X worker types. When you sign in to the AWS Glue console using an IAM user associated with this usage profile and create a job with a G.2X worker type, saving it will result in a failure.

Because an AWS Glue profile is a resource identified by an ARN, all the default IAM controls apply, including action-based, resource-based, and tag-based authorization. Admins update the IAM policy of users who create AWS Glue resources, granting them read permission on the profiles. This enables users to view the profiles. In order to use them when making API calls to create AWS Glue resources, admins will tag the user or role with glue:UsageProfile as the key and the profile name as the value. AWS Glue validates the API requests such as CreateJob, UpdateJob, StartJobRun, and CreateSession based on the values specified in the AWS Glue profile and raise appropriate exceptions.

In the following sections, we demonstrate how to create AWS Glue usage profiles, assign profiles to users, and demonstrate the usage profiles in action.

Create an AWS Glue usage profiles

To get started and create AWS Glue usage profiles, complete the following steps:

  1. On the AWS Glue console, choose Cost management in the navigation pane.

Let’s create your first usage profile for your developers.

  1. Choose Create usage profile.
  2. For Usage profile name, enter developer.
  3. Under Customize configurations for jobs, for Number of workers, for Default, enter 20.
  4. For Default worker type, choose G.1X.
  5. For Allowed worker types, choose G.1X, G.2X, G.4X, and G.8X.
  6. For Customize configurations for sessions, configure the same values.
  7. Choose Create usage profile.

Next, create another usage profile for your business analysts, who need fewer workers and a lower timeout or idle timeout value.

  1. Choose Create usage profile.
  2. For Usage profile name, enter analyst.
  3. Under Customize configurations for jobs, for Number of workers, for Default, enter 2. For Maximum, enter 5.
  4. For Default worker type, choose G.1X.
  5. For Allowed worker types, choose only G.1X.
  6. For Timeout, for Default, enter 60. For Maximum, enter 120.
  7. For Customize configurations for sessions, configure the same values.
  8. For Idle timeout, for Default, enter 10. For Maximum, enter 60.
  9. Choose Create usage profile.

You have successfully created two usage profiles.

Assign usage profiles

Restrictions can only be applied to AWS Glue API calls made by IAM users or roles if the profile is assigned to them. There are two steps that the admin needs to take in order to assign a profile:

  • In IAM, create a tag named glue:UsageProfile on the user or role, with the name of the profile used as the tag value
  • The IAM policy assigned to the user or role needs to be updated to include the glue:GetUsageProfile IAM action permission to read the assigned profile

Follow these steps to create two new users, each assigned a different profile:

  1. On the IAM console, choose Users in the navigation pane.
  2. Choose Create user.
  3. For User name, enter blogDeveloper.
  4. Select Provide user access to the AWS Management Console and I want to create an IAM user.
  5. You can enter a custom password or let one be generated (in the latter case, select Show password so you can use it later to sign in).
  6. Choose Next.
  7. Attach the managed policies AWSGlueConsoleFullAccess and IAMReadOnlyAccess.
  8. Choose Next.
  9. Review the summary and complete the creation.
  10. Remember the password for later and choose Return to users list and choose the user just created.
  11. On the Permissions tab, for Add permissions, choose Create inline policy.
  12. In the policy editor, switch to JSON and enter the following policy, replacing the AWS Region, account ID, and usage profile name placeholders. For the usage profile name, use the value developer for the user blogDeveloper and analyst for the role blogAnalyst.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "glue:GetUsageProfile"
          ],
          "Resource": [
            "arn:aws:glue:<aws region>:<account id>:usageProfile/<usage profile name>"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "iam:PassRole"
          ],
          "Resource": [
            "*"
          ],
          "Condition": {
            "StringLike": {
              "iam:PassedToService": [
                "glue.amazonaws.com"
              ]
            }
          }
        }
      ]
    }

  13. Name the policy GlueUsageProfilePermission and complete the creation.
  14. On the Tags tab, add a new tag with the name glue:UsageProfile and the value developer.

Repeat the steps to create a user named blogAnalyst, and replace the ARN in the policy with arn:aws:glue:<aws region>:<account id>:usageProfile/analyst. Make sure the Region and account ID are populated before updating the policy. For the tag value, specify analyst instead of developer.

On the AWS Glue console, navigate to the developer usage profile. You can see that the status has been changed from Not assigned to Assigned.

Lastly, complete the following steps to create two IAM roles for AWS Glue jobs and sessions with the profile.

  1. Create two IAM roles for AWS Glue. Name them GlueServiceRole-developer and GlueServiceRole-analyst.
  2. Configure the following inline policies by replacing the Region, account ID, and usage profile name placeholders. For the usage profile name placeholder, use the value developer for the role GlueServiceRole-developer and analyst for the role GlueServiceRole-analyst.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "glue:GetUsageProfile"
          ],
          "Resource": [
            "arn:aws:glue:<aws region>:<account id>:usageProfile/<usage profile name>"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "iam:PassRole"
          ],
          "Resource": [
            "*"
          ],
          "Condition": {
            "StringLike": {
              "iam:PassedToService": [
                "glue.amazonaws.com"
              ]
            }
          }
        }
      ]
    }

  3. On the Tags tab for the IAM role, add a new tag with the name glue:UsageProfile and the value developer for GlueServiceRole-developer and analyst for GlueServiceRole-analyst.

Usage profiles in action: Jobs

Now you have two users with different AWS Glue profiles assigned. Let’s test them and see the differences. First, let’s try the user blogDeveloper to see how the profile developer works.

  1. Open the AWS Glue console with the blogDeveloper user.
  2. Choose ETL jobs in the navigation pane and choose Script editor.
  3. Choose Create script.
  4. Choose the Job details tab.

The default number of Requested number of workers is 20, which corresponds to the default setting of the profile developer.

Next, let’s try the user blogAnalyst to see how the profile analyst works.

  1. Open AWS Glue console with the blogAnalyst user.
  2. Choose ETL jobs in the navigation pane and choose Script editor.
  3. Choose Create script.
  4. Choose the Job details tab.

The default number of Requested number of workers is 2, which corresponds to the default setting of the profile analyst.

Additionally, the default number of Job timeout is 60, which corresponds to the default setting of the profile analyst.

  1. For Worker type, choose the dropdown menu.

Only G.1X is available and G.2X, G.4X, and G.8X are disabled. This is because we allowed the profile analyst to choose G.1X.

  1. For Requested number of workers, enter 20 to simulate invalid input.

You will see the waring message The maximum number of workers cannot exceed 5 for usage profile "analyst".

Now, the user blogAnalyst is attempting to run a job in the account where the number of workers set for the job is 20. However, the maximum number of workers in the profile assigned to this user is 5. When the user tries to run the job, it fails with an error, as shown in the following screenshot.

In this example, we’ve demonstrated how usage profiles manage AWS Glue jobs based on the preconfigured values in the profiles.

Usage profiles in action: Sessions

Next, continue using the user blogAnalyst and try the AWS Glue Studio notebook interface to see how interactive sessions work with usage profiles:

  1. Open the AWS Glue console with the blogAnalyst user.
  2. Choose ETL jobs in the navigation pane and choose Notebook.
  3. For IAM role, choose GlueServiceRole-analyst.
  4. Choose Create notebook.
  5. Wait for the notebook to be ready.

In the second notebook cell, %number_of_workers is set to 2, which corresponds to the default value of the profile analyst.

  1. Update %number_of_workers from 2 to 10 to simulate an invalid access pattern:
    %number_of_workers 10

  2. Run the cell.

You get an error message saying “Provided number of workers is not within the range [1, 5] in the analyst profile.”

This is because the given value of 10 exceeds the maximum number of workers set in the profile assigned to this user.

  1. Update %number_of_workers from 10 to 5 to simulate a valid access pattern:
    %number_of_workers 5

  2. Run the cell.

This time, the session has been successfully created.

Now you have observed how usage profiles manage AWS Glue interactive sessions based on the preconfigured values in the profiles.

Conclusion

This post demonstrated how AWS Glue usage profiles allow you to manage your AWS Glue resources with ease and flexibility.

With AWS Glue usage profiles, you can manage and control resources of different users in order to set your organization’s best practices and save costs. AWS Glue usage profiles serve as a guardrail to prevent unauthorized resource usage from occurring.

Try out the feature for yourself, and leave any feedback or questions in the comments.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team, with a background in machine learning and AI.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue. She is passionate about designing and building end-to-end solutions to address customer data integration and analytic needs.

Gal HeyneGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering, and BI. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data products.

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Design a data mesh pattern for Amazon EMR-based data lakes using AWS Lake Formation with Hive metastore federation

Post Syndicated from Sudipta Mitra original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-pattern-for-amazon-emr-based-data-lakes-using-aws-lake-formation-with-hive-metastore-federation/

In this post, we delve into the key aspects of using Amazon EMR for modern data management, covering topics such as data governance, data mesh deployment, and streamlined data discovery.

One of the key challenges in modern big data management is facilitating efficient data sharing and access control across multiple EMR clusters. Organizations have multiple Hive data warehouses across EMR clusters, where the metadata gets generated. To address this challenge, organizations can deploy a data mesh using AWS Lake Formation that connects the multiple EMR clusters. With the AWS Glue Data Catalog federation to external Hive metastore feature, you can now now apply data governance to the metadata residing across those EMR clusters and analyze them using AWS analytics services such as Amazon Athena, Amazon Redshift Spectrum, AWS Glue ETL (extract, transform, and load) jobs, EMR notebooks, EMR Serverless using Lake Formation for fine-grained access control, and Amazon SageMaker Studio. For detailed information on managing your Apache Hive metastore using Lake Formation permissions, refer to Query your Apache Hive metastore with AWS Lake Formation permissions.

In this post, we present a methodology for deploying a data mesh consisting of multiple Hive data warehouses across EMR clusters. This approach enables organizations to take advantage of the scalability and flexibility of EMR clusters while maintaining control and integrity of their data assets across the data mesh.

Use cases for Hive metastore federation for Amazon EMR

Hive metastore federation for Amazon EMR is applicable to the following use cases:

  • Governance of Amazon EMR-based data lakes – Producers generate data within their AWS accounts using an Amazon EMR-based data lake supported by EMRFS on Amazon Simple Storage Service (Amazon S3)and HBase. These data lakes require governance for access without the necessity of moving data to consumer accounts. The data resides on Amazon S3, which reduces the storage costs significantly.
  • Centralized catalog for published data – Multiple producers release data currently governed by their respective entities. For consumer access, a centralized catalog is necessary where producers can publish their data assets.
  • Consumer personas – Consumers include data analysts who run queries on the data lake, data scientists who prepare data for machine learning (ML) models and conduct exploratory analysis, as well as downstream systems that run batch jobs on the data within the data lake.
  • Cross-producer data access – Consumers may need to access data from multiple producers within the same catalog environment.
  • Data access entitlements – Data access entitlements involve implementing restrictions at the database, table, and column levels to provide appropriate data access control.

Solution overview

The following diagram shows how data from producers with their own Hive metastores (left) can be made available to consumers (right) using Lake Formation permissions enforced in a central governance account.

Producer and consumer are logical concepts used to indicate the production and consumption of data through a catalog. An entity can act both as a producer of data assets and as a consumer of data assets. The onboarding of producers is facilitated by sharing metadata, whereas the onboarding of consumers is based on granting permission to access this metadata.

The solution consists of multiple steps in the producer, catalog, and consumer accounts:

  1. Deploy the AWS CloudFormation templates and set up the producer, central governance and catalog, and consumer accounts.
  2. Test access to the producer cataloged Amazon S3 data using EMR Serverless in the consumer account.
  3. Test access using Athena queries in the consumer account.
  4. Test access using SageMaker Studio in the consumer account.

Producer

Producers create data within their AWS accounts using an Amazon EMR-based data lake and Amazon S3. Multiple producers then publish this data into a central catalog (data lake technology) account. Each producer account, along with the central catalog account, has either VPC peering or AWS Transit Gateway enabled to facilitate AWS Glue Data Catalog federation with the Hive metastore.

For each producer, an AWS Glue Hive metastore connector AWS Lambda function is deployed in the catalog account. This enables the Data Catalog to access Hive metastore information at runtime from the producer. The data lake locations (the S3 bucket location of the producers) are registered in the catalog account.

Central catalog

A catalog offers governed and secure data access to consumers. Federated databases are established within the catalog account’s Data Catalog using the Hive connection, managed by the catalog Lake Formation admin (LF-Admin). These federated databases in the catalog account are then shared by the data lake LF-Admin with the consumer LF-Admin of the external consumer account.

Data access entitlements are managed by applying access controls as needed at various levels, such as the database or table.

Consumer

The consumer LF-Admin grants the necessary permissions or restricted permissions to roles such as data analysts, data scientists, and downstream batch processing engine AWS Identity and Access Management (IAM) roles within its account.

Data access entitlements are managed by applying access control based on requirements at various levels, such as databases and tables.

Prerequisites

You need three AWS accounts with admin access to implement this solution. It is recommended to use test accounts. The producer account will host the EMR cluster and S3 buckets. The catalog account will host Lake Formation and AWS Glue. The consumer account will host EMR Serverless, Athena, and SageMaker notebooks.

Set up the producer account

Before you launch the CloudFormation stack, gather the following information from the catalog account:

  • Catalog AWS account ID (12-digit account ID)
  • Catalog VPC ID (for example, vpc-xxxxxxxx)
  • VPC CIDR (catalog account VPC CIDR; it should not overlap 10.0.0.0/16)

The VPC CIDR of the producer and catalog can’t overlap due to VPC peering and Transit Gateway requirements. The VPC CIDR should be a VPC from the catalog account where the AWS Glue metastore connector Lambda function will be eventually deployed.

The CloudFormation stack for the producer creates the following resources:

  • S3 bucket to host data for the Hive metastore of the EMR cluster.
  • VPC with the CIDR 10.0.0.0/16. Make sure there is no existing VPC with this CIDR in use.
  • VPC peering connection between the producer and catalog account.
  • Amazon Elastic Compute Cloud (Amazon EC2) security groups for the EMR cluster.
  • IAM roles required for the solution.
  • EMR 6.10 cluster launched with Hive.
  • Sample data downloaded to the S3 bucket.
  • A database and external tables, pointing to the downloaded sample data, in its Hive metastore.

Complete the following steps:

  1. Launch the template PRODUCER.yml. It’s recommended to use an IAM role that has administrator privileges.
  2. Gather the values for the following on the CloudFormation stack’s Outputs tab:
    1. VpcPeeringConnectionId (for example, pcx-xxxxxxxxx)
    2. DestinationCidrBlock (10.0.0.0/16)
    3. S3ProducerDataLakeBucketName

Set up the catalog account

The CloudFormation stack for the catalog account creates the Lambda function for federation. Before you launch the template, on the Lake Formation console, add the IAM role and user deploying the stack as the data lake admin.

Then complete the following steps:

  1. Launch the template CATALOG.yml.
  2. For the RouteTableId parameter, use the catalog account VPC RouteTableId. This is the VPC where the AWS Glue Hive metastore connector Lambda function will be deployed.
  3. On the stack’s Outputs tab, copy the value for LFRegisterLocationServiceRole (arn:aws:iam::account-id: role/role-name).
  4. Confirm if the Data Catalog setting has the IAM access control options un-checked and the current cross-account version is set to 4.

  1. Log in to the producer account and add the following bucket policy to the producer S3 bucket that was created during the producer account setup. Add the ARN of LFRegisterLocationServiceRole to the Principal section and provide the S3 bucket name under the Resource section.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::account-id: role/role-name"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::s3-bucket-name/*",
                "arn:aws:s3:::s3-bucket-name"
            ]
        }
    ]
}
  1. In the producer account, on the Amazon EMR console, navigate to the primary node EC2 instance to get the value for Private IP DNS name (IPv4 only) (for example, ip-xx-x-x-xx.us-west-1.compute.internal).

  1. Switch to the catalog account and deploy the AWS Glue Data Catalog federation Lambda function (GlueDataCatalogFederation-HiveMetastore).

The default Region is set to us-east-1. Change it to your desired Region before deploying the function.

Use the VPC that was used as the CloudFormation input for the VPC CIDR. You can use the VPC’s default security group ID. If using another security group, make sure the outbound allows traffic to 0.0.0.0/0.

Next, you create a federated database in Lake Formation.

  1. On the Lake Formation console, choose Data sharing in the navigation pane.
  2. Choose Create database.

  1. Provide the following information:
    1. For Connection name, choose your connection.
    2. For Database name, enter a name for your database.
    3. For Database identifier, enter emrhms_salesdb (this is the database created on the EMR Hive metastore).
  2. Choose Create database.

  1. On the Databases page, select the database and on the Actions menu, choose Grant to grant describe permissions to the consumer account.

  1. Under Principals, select External accounts and choose your account ARN.
  2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database and table.
  3. Under Table permissions, provide the following information:
    1. For Table permissions¸ select Select and Describe.
    2. For Grantable permissions¸ select Select and Describe.
  4. Under Data permissions, select All data access.
  5. Choose Grant.

  1. On the Tables page, select your table and on the Actions menu, choose Grant to grant select and describe permissions.

  1. Under Principals, select External accounts and choose your account ARN.
  2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database.
  3. Under Database permissions¸ provide the following information:
    1. For Database permissions¸ select Create table and Describe.
    2. For Grantable permissions¸ select Create table and Describe.
  4. Choose Grant.

Set up the consumer account

Consumers include data analysts who run queries on the data lake, data scientists who prepare data for ML models and conduct exploratory analysis, as well as downstream systems that run batch jobs on the data within the data lake.

The consumer account setup in this section shows how you can query the shared Hive metastore data using Athena for the data analyst persona, EMR Serverless to run batch scripts, and SageMaker Studio for the data scientist to further use data in the downstream model building process.

For EMR Serverless and SageMaker Studio, if you’re using the default IAM service role, add the required Data Catalog and Lake Formation IAM permissions to the role and use Lake Formation to grant table permission access to the role’s ARN.

Data analyst use case

In this section, we demonstrate how a data analyst can query the Hive metastore data using Athena. Before you get started, on the Lake Formation console, add the IAM role or user deploying the CloudFormation stack as the data lake admin.

Then complete the following steps:

  1. Run the CloudFormation template CONSUMER.yml.
  2. If the catalog and consumer accounts are not part of the organization in AWS Organizations, navigate to the AWS Resource Access Manager (AWS RAM) console and manually accept the resources shared from the catalog account.
  3. On the Lake Formation console, on the Databases page, select your database and on the Actions menu, choose Create resource link.

  1. Under Database resource link details, provide the following information:
    1. For Resource link name, enter a name.
    2. For Shared database’s region, choose a Region.
    3. For Shared database, choose your database.
    4. For Shared database’s owner ID, enter the account ID.
  2. Choose Create.

Now you can use Athena to query the table on the consumer side, as shown in the following screenshot.

Batch job use case

Complete the following steps to set up EMR Serverless to run a sample Spark job to query the existing table:

  1. On the Amazon EMR console, choose EMR Serverless in the navigation pane.
  2. Choose Get started.

  1. Choose Create and launch EMR Studio.

  1. Under Application settings, provide the following information:
    1. For Name, enter a name.
    2. For Type, choose Spark.
    3. For Release version, choose the current version.
    4. For Architecture, select x86_64.
  2. Under Application setup options, select Use custom settings.

  1. Under Additional configurations, for Metastore configuration, select Use AWS Glue Data Catalog as metastore, then select Use Lake Formation for fine-grained access control.
  2. Choose Create and start application.

  1. On the application details page, on the Job runs tab, choose Submit job run.

  1. Under Job details, provide the following information:
    1. For Name, enter a name.
    2. For Runtime role¸ choose Create new role.
    3. Note the IAM role that gets created.
    4. For Script location, enter the S3 bucket location created by the CloudFormation template (the script is emr-serverless-query-script.py).
  2. Choose Submit job run.

  1. Add the following AWS Glue access policy to the IAM role created in the previous step (provide your Region and the account ID of your catalog account):
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:CreateDatabase",
                "glue:GetDataBases",
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:BatchCreatePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:1234567890:catalog",
                "arn:aws:glue:us-east-1:1234567890:database/*",
                "arn:aws:glue:us-east-1:1234567890:table/*/*"
            ]
        }
    ]
}
  1. Add the following Lake Formation access policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "LakeFormation:GetDataAccess"
            "Resource": "*"
        }
    ]
}
  1. On the Databases page, select the database and on the Actions menu, choose Grant to grant Lake Formation access to the EMR Serverless runtime role.
  2. Under Principals, select IAM users and roles and choose your role.
  3. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database.
  4. Under Resource link permissions, for Resource link permissions, select Describe.
  5. Choose Grant.

  1. On the Databases page, select the database and on the Actions menu, choose Grant on target.

  1. Provide the following information:
    1. Under Principals, select IAM users and roles and choose your role.
    2. Under LF-Tags or catalog resources, select Named Data Catalog resources and choose your database and table
    3. Under Table permissions, for Table permissions, select Select.
    4. Under Data permissions, select All data access.
  2. Choose Grant.

  1. Submit the job again by cloning it.
  2. When the job is complete, choose View logs.

The output should look like the following screenshot.

Data scientist use case

For this use case, a data scientist queries the data through SageMaker Studio. Complete the following steps:

  1. Set up SageMaker Studio.
  2. Confirm that the domain user role has been granted permission by Lake Formation to SELECT data from the table.
  3. Follow steps similar to the batch run use case to grant access.

The following screenshot shows an example notebook.

Clean up

We recommend deleting the CloudFormation stack after use, because the deployed resources will incur costs. There are no prerequisites to delete the producer, catalog, and consumer CloudFormation stacks. To delete the Hive metastore connector stack on the catalog account (serverlessrepo-GlueDataCatalogFederation-HiveMetastore), first delete the federated database you created.

Conclusion

In this post, we explained how to create a federated Hive metastore for deploying a data mesh architecture with multiple Hive data warehouses across EMR clusters.

By using Data Catalog metadata federation, organizations can construct a sophisticated data architecture. This approach not only seamlessly extends your Hive data warehouse but also consolidates access control and fosters integration with various AWS analytics services. Through effective data governance and meticulous orchestration of the data mesh architecture, organizations can provide data integrity, regulatory compliance, and enhanced data sharing across EMR clusters.

We encourage you to check out the features of the AWS Glue Hive metastore federation connector and explore how to implement a data mesh architecture across multiple EMR clusters. To learn more and get started, refer to the following resources:


About the Authors

Sudipta Mitra is a Senior Data Architect for AWS, and passionate about helping customers to build modern data analytics applications by making innovative use of latest AWS services and their constantly evolving features. A pragmatic architect who works backwards from customer needs, making them comfortable with the proposed solution, helping achieve tangible business outcomes. His main areas of work are Data Mesh, Data Lake, Knowledge Graph, Data Security and Data Governance.

Deepak Sharma is a Senior Data Architect with the AWS Professional Services team, specializing in big data and analytics solutions. With extensive experience in designing and implementing scalable data architectures, he collaborates closely with enterprise customers to build robust data lakes and advanced analytical applications on the AWS platform.

Nanda Chinnappa is a Cloud Infrastructure Architect with AWS Professional Services at Amazon Web Services. Nanda specializes in Infrastructure Automation, Cloud Migration, Disaster Recovery and Databases which includes Amazon RDS and Amazon Aurora. He helps AWS Customer’s adopt AWS Cloud and realize their business outcome by executing cloud computing initiatives.

Get started with AWS Glue Data Quality dynamic rules for ETL pipelines

Post Syndicated from Prasad Nadig original https://aws.amazon.com/blogs/big-data/get-started-with-aws-glue-data-quality-dynamic-rules-for-etl-pipelines/

Hundreds of thousands of organizations build data integration pipelines to extract and transform data. They establish data quality rules to ensure the extracted data is of high quality for accurate business decisions. These rules assess the data based on fixed criteria reflecting current business states. However, when the business environment changes, data properties shift, rendering these fixed criteria outdated and causing poor data quality.

For example, a data engineer at a retail company established a rule that validates daily sales must exceed a 1-million-dollar threshold. After a few months, daily sales surpassed 2 million dollars, rendering the threshold obsolete. The data engineer couldn’t update the rules to reflect the latest thresholds due to lack of notification and the effort required to manually analyze and update the rule. Later in the month, business users noticed a 25% drop in their sales. After hours of investigation, the data engineers discovered that an extract, transform, and load (ETL) pipeline responsible for extracting data from some stores had failed without generating errors. The rule with outdated thresholds continued to operate successfully without detecting this issue. The ordering system that used the sales data placed incorrect orders, causing low inventory for future weeks. What if the data engineer had the ability to set up dynamic thresholds that automatically adjusted as business properties changed?

We are excited to talk about how to use dynamic rules, a new capability of AWS Glue Data Quality. Now, you can define dynamic rules and not worry about updating static rules on a regular basis to adapt to varying data trends. This feature enables you to author dynamic rules to compare current metrics produced by your rules with your historical values. These historical comparisons are enabled by using the last(k) operator in expressions. For example, instead of writing a static rule like RowCount > 1000, which might become obsolete as data volume grows over time, you can replace it with a dynamic rule like RowCount > min(last(3)) . This dynamic rule will succeed when the number of rows in the current run is greater than the minimum row count from the most recent three runs for the same dataset.

This is part 7 of a seven-part series of posts to explain how AWS Glue Data Quality works. Check out the other posts in the series:

Previous posts explain how to author static data quality rules. In this post, we show how to create an AWS Glue job that measures and monitors the data quality of a data pipeline using dynamic rules. We also show how to take action based on the data quality results.

Solution overview

Let’s consider an example data quality pipeline where a data engineer ingests data from a raw zone and loads it into a curated zone in a data lake. The data engineer is tasked with not only extracting, transforming, and loading data, but also identifying anomalies compared against data quality statistics from historical runs.

In this post, you’ll learn how to author dynamic rules in your AWS Glue job in order to take appropriate actions based on the outcome.

The data used in this post is sourced from NYC yellow taxi trip data. The yellow taxi trip records include fields capturing pickup and dropoff dates and times, pickup and dropoff locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The following screenshot shows an example of the data.

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket (gluedataqualitydynamicrules-*)
  • An AWS Lambda which will create the following folder structure within the above Amazon S3 bucket:
    • raw-src/
    • landing/nytaxi/
    • processed/nytaxi/
    • dqresults/nytaxi/
  • AWS Identity and Access Management (IAM) users, roles, and policies. The IAM role GlueDataQuality-* has AWS Glue run permission as well as read and write permission on the S3 bucket.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:  
  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack and wait for the stack creation step to complete.

Upload sample data

  1. Download the dataset to your local machine.
  2. Unzip the file and extract the Parquet files into a local folder.
  3. Upload parquet files under prefix raw-src/ in Amazon s3 bucket (gluedataqualitydynamicrules-*)

Implement the solution

To start configuring your solution, complete the following steps:

  1. On the AWS Glue Studio console, choose ETL Jobs in the navigation pane and choose Visual ETL.
  2. Navigate to the Job details tab to configure the job.
  3. For Name, enter GlueDataQualityDynamicRules
  4. For IAM Role, choose the role starting with GlueDataQuality-*.
  5. For Job bookmark, choose Enable.

This allows you to run this job incrementally. To learn more about job bookmarks, refer to Tracking processed data using job bookmarks.

  1. Leave all the other settings as their default values.
  2. Choose Save.
  3. After the job is saved, navigate to the Visual tab and on the Sources menu, choose Amazon S3.
  4. In the Data source properties – S3 pane, for S3 source type, select S3 location.
  5. Choose Browse S3 and navigate to the prefix /landing/nytaxi/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  6. For Data format, choose Parquet and choose Infer schema.

  1. On the Transforms menu, choose Evaluate Data Quality.

You now implement validation logic in your process to identify potential data quality problems originating from the source data.

  1. To accomplish this, specify the following DQDL rules on the Ruleset editor tab:
    CustomSql "select vendorid from primary where passenger_count > 0" with threshold > 0.9,
    Mean "trip_distance" < max(last(3)) * 1.50,
    Sum "total_amount" between min(last(3)) * 0.8 and max(last(3)) * 1.2,
    RowCount between min(last(3)) * 0.9 and max(last(3)) * 1.2,
    Completeness "fare_amount" >= avg(last(3)) * 0.9,
    DistinctValuesCount "ratecodeid" between avg(last(3))-1 and avg(last(3))+2,
    DistinctValuesCount "pulocationid" > avg(last(3)) * 0.8,
    ColumnCount = max(last(2))

  1. Select Original data to output the original input data from the source and add a new node below the Evaluate Data Quality node.
  2. Choose Add new columns to indicate data quality errors to add four new columns to the output schema.
  3. Select Data quality results to capture the status of each rule configured and add a new node below the Evaluate Data Quality node.

  1. With rowLevelOutcomes node selected, choose Amazon S3 on the Targets menu.
  2. Configure the S3 target location to /processed/nytaxi/ under the bucket name starting with gluedataqualitydynamicrules-* and set the output format to Parquet and compression type to Snappy.

  1. With the ruleOutcomes node selected, choose Amazon S3 on the Targets menu.
  2. Configure the S3 target location to /dqresults/ under the bucket name starting with gluedataqualitydynamicrules-*.
  3. Set the output format to Parquet and compression type to Snappy.
  4. Choose Save.

Up to this point, you have set up an AWS Glue job, specified dynamic rules for the pipeline, and configured the target location for both the original source data and AWS Glue Data Quality results to be written on Amazon S3. Next, let’s examine dynamic rules and how they function, and provide an explanation of each rule we used in our job.

Dynamic rules

You can now author dynamic rules to compare current metrics produced by your rules with their historical values. These historical comparisons are enabled by using the last() operator in expressions. For example, the rule RowCount > max(last(1)) will succeed when the number of rows in the current run is greater than the most recent prior row count for the same dataset. last() takes an optional natural number argument describing how many prior metrics to consider; last(k) where k >= 1 will reference the last k metrics. The rule has the following conditions:

  • If no data points are available, last(k) will return the default value 0.0
  • If fewer than k metrics are available, last(k) will return all prior metrics

For example, if values from previous runs are (5, 3, 2, 1, 4), max(last (3)) will return 5.

AWS Glue supports over 15 types of dynamic rules, providing a robust set of data quality validation capabilities. For more information, refer to Dynamic rules. This section demonstrates several rule types to showcase the functionality and enable you to apply these features in your own use cases.

CustomSQL

The CustomSQL rule provides the capability to run a custom SQL statement against a dataset and check the return value against a given expression.

The following example rule uses a SQL statement wherein you specify a column name in your SELECT statement, against which you compare with some condition to get row-level results. A threshold condition expression defines a threshold of how many records should fail in order for the entire rule to fail. In this example, more than 90% of records should contain passenger_count greater than 0 for the rule to pass:

CustomSql "select vendorid from primary where passenger_count > 0" with threshold > 0.9

Note: Custom SQL also supports Dynamic rules, below is an example of how to use it in your job

CustomSql "select count(*) from primary" between min(last(3)) * 0.9 and max(last(3)) * 1.2

Mean

The Mean rule checks whether the mean (average) of all the values in a column matches a given expression.

The following example rule checks that the mean of trip_distance is less than the maximum value for the column trip distance over the last three runs times 1.5:

Mean "trip_distance" < max(last(3)) * 1.50

Sum

The Sum rule checks the sum of all the values in a column against a given expression.

The following example rule checks that the sum of total_amount is between 80% of the minimum of the last three runs and 120% of the maximum of the last three runs:

Sum "total_amount" between min(last(3)) * 0.8 and max(last(3)) * 1.2

RowCount

The RowCount rule checks the row count of a dataset against a given expression. In the expression, you can specify the number of rows or a range of rows using operators like > and <.

The following example rule checks if the row count is between 90% of the minimum of the last three runs and 120% of the maximum of last three runs (excluding the current run). This rule applies to the entire dataset.

RowCount between min(last(3)) * 0.9 and max(last(3)) * 1.2

Completeness

The Completeness rule checks the percentage of complete (non-null) values in a column against a given expression.

The following example rule checks if the completeness of the fare_amount column is greater than or equal to the 90% of the average of the last three runs:

Completeness "fare_amount" >= avg(last(3)) * 0.9

DistinctValuesCount

The DistinctValuesCount rule checks the number of distinct values in a column against a given expression.

The following example rules checks for two conditions:

  • If the distinct count for the ratecodeid column is between the average of the last three runs minus 1 and the average of the last three runs plus 2
  • If the distinct count for the pulocationid column is greater than 80% of the average of the last three runs
    DistinctValuesCount "ratecodeid" between avg(last(3))-1 and avg(last(3))+2,
    DistinctValuesCount "pulocationid" > avg(last(3)) * 0.8

ColumnCount

The ColumnCount rule checks the column count of the primary dataset against a given expression. In the expression, you can specify the number of columns or a range of columns using operators like > and <.

The following example rule check if the column count is equal to the maximum of the last two runs:

ColumnCount = max(last(2))

Run the job

Now that the job setup is complete, we are prepared to run it. As previously indicated, dynamic rules are determined using the last(k) operator, with k set to 3 in the configured job. This implies that data quality rules will be evaluated using metrics from the previous three runs. To assess these rules accurately, the job must be run a minimum of k+1 times, requiring a total of four runs to thoroughly evaluate dynamic rules. In this example, we simulate an ETL job with data quality rules, starting with an initial run followed by three incremental runs.

First job (initial)

Complete the following steps for the initial run:

  1. Navigate to the source data files made available under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the initial run, copy the day one file 20220101.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.

  1. On the AWS Glue Studio console, choose ETL Jobs in the navigation pane.
  2. Choose GlueDataQualityDynamicRule under Your jobs to open it.
  3. Choose Run to run the job.

You can view the job run details on the Runs tab. It will take a few minutes for the job to complete.

  1. After job successfully completes, navigate to the Data quality -updated tab.

You can observe the Data Quality rules, rule status, and evaluated metrics for each rule that you set in the job. The following screenshot shows the results.

The rule details are as follows:

  • CustomSql – The rule passes the data quality check because 95% of records have a passenger_count greater than 0, which exceeds the set threshold of 90%.
  • Mean – The rule fails due to the absence of previous runs, resulting in a default value of 0.0 when using last(3), with an overall mean of 5.94, which is greater than 0. If no data points are available, last(k) will return the default value of 0.0.
  • Sum – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • RowCount – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • Completeness – The rule passes because 100% of records are complete, meaning there are no null values for the fare_amount column.
  • DistinctValuesCount “ratecodeid” – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • DistinctValuesCount “pulocationid” – The rule passes because the distinct count of 205 for the pulocationid column is higher than the set threshold, with a value of 0.00 because avg(last(3))*0.8 results in 0.
  • ColumnCount – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.

Second job (first incremental)

Now that you have successfully completed the initial run and observed the data quality results, you are ready for the first incremental run to process the file from day two. Complete the following steps:

  1. Navigate to the source data files made available under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the first incremental run, copy the day two file 20220102.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) run to run the job and validate the data quality results.

The following screenshot shows the data quality results.

On the second run, all rules passed because each rule’s threshold has been met:

  • CustomSql – The rule passed because 96% of records have a passenger_count greater than 0, exceeding the set threshold of 90%.
  • Mean – The rule passed because the mean of 6.21 is less than 9.315 (6.21 * 1.5, meaning the mean from max(last(3)) is 6.21, multiplied by 1.5).
  • Sum – The rule passed because the sum of the total amount, 1,329,446.47, is between 80% of the minimum of the last three runs, 1,063,557.176 (1,329,446.47 * 0.8), and 120% of the maximum of the last three runs, 1,595,335.764 (1,329,446.47 * 1.2).
  • RowCount – The rule passed because the row count of 58,421 is between 90% of the minimum of the last three runs, 52,578.9 (58,421 * 0.9), and 120% of the maximum of the last three runs, 70,105.2 (58,421 * 1.2).
  • Completeness – The rule passed because 100% of the records have non-null values for the fare amount column, exceeding the set threshold of the average of the last three runs times 90%.
  • DistinctValuesCount “ratecodeid” – The rule passed because the distinct count of 8 for the ratecodeid column is between the set threshold of 6, which is the average of the last three runs minus 1 ((7)/1 = 7 – 1), and 9, which is the average of the last three runs plus 2 ((7)/1 = 7 + 2).
  • DistinctValuesCount “pulocationid” – The rule passed because the distinct count of 201 for the pulocationid column is greater than 80% of the average of the last three runs, 160.8 (201 * 0.8).
  • ColumnCount – The rule passed because the number of columns, 19, is equal to the maximum of the last two runs.

Third job (second incremental)

After the successful completion of the first incremental run, you are ready for the second incremental run to process the file from day three. Complete the following steps:

  1. Navigate to the source data files under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the second incremental run, copy the day three file 20220103.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) job to run the job and validate data quality results.

The following screenshot shows the data quality results.

Similar to the second run, the data file from the source didn’t contain any data quality issues. As a result, all of the defined data validation rules were within the set thresholds and passed successfully.

Fourth job (third incremental)

Now that you have successfully completed the first three runs and observed the data quality results, you are ready for the final incremental run for this exercise, to process the file from day four. Complete the following steps:

  1. Navigate to the source data files under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the third incremental run, copy the day four file 20220104.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) job to run the job and validate the data quality results.

The following screenshot shows the data quality results.

In this run, there are some data quality issues from the source that were caught by the AWS Glue job, causing the rules to fail. Let’s examine each failed rule to understand the specific data quality issues that were detected:

  • CustomSql – The rule failed because only 80% of the records have a passenger_count greater than 0, which is lower than the set threshold of 90%.
  • Mean – The rule failed because the mean of trip_distance is 71.74, which is greater than 1.5 times the maximum of the last three runs, 11.565 (7.70 * 1.5).
  • Sum – The rule passed because the sum of total_amount is 1,165,023.73, which is between 80% of the minimum of the last three runs, 1,063,557.176 (1,329,446.47 * 0.8), and 120% of the maximum of the last three runs, 1,816,645.464 (1,513,871.22 * 1.2).
  • RowCount – The rule failed because the row count of 44,999 is not between 90% of the minimum of the last three runs, 52,578.9 (58,421 * 0.9), and 120% of the maximum of the last three runs, 88,334.1 (72,405 * 1.2).
  • Completeness – The rule failed because only 82% of the records have non-null values for the fare_amount column, which is lower than the set threshold of the average of the last three runs times 90%.
  • DistinctValuesCount “ratecodeid” – The rule failed because the distinct count of 6 for the ratecodeid column is not between the set threshold of 6.66, which is the average of the last three runs minus 1 ((8+8+7)/3 = 7.66 – 1), and 9.66, which is the average of the last three runs plus 1 ((8+8+7)/3 = 7.66 + 2).
  • DistinctValuesCount “pulocationid” – The rule passed because the distinct count of 205 for the pulocationid column is greater than 80% of the average of the last three runs, 165.86 ((216+201+205)/3 = 207.33 * 0.8).
  • ColumnCount – The rule passed because the number of columns, 19, is equal to the maximum of the last two runs.

To summarize the outcome of the fourth run: the rules for Sum and DistinctValuesCount for pulocationid, as well as the ColumnCount rule, passed successfully. However, the rules for CustomSql, Mean, RowCount, Completeness, and DistinctValuesCount for ratecodeid failed to meet the criteria.

Upon examining the Data Quality evaluation results, further investigation is necessary to identify the root cause of these data quality issues. For instance, in the case of the failed RowCount rule, it’s imperative to ascertain why there was a decrease in record count. This investigation should delve into whether the drop aligns with actual business trends or if it stems from issues within the source system, data ingestion process, or other factors. Appropriate actions must be taken to rectify these data quality issues or update the rules to accommodate natural business trends.

You can expand this solution by implementing and configuring alerts and notifications to promptly address any data quality issues that arise. For more details, refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality (Part 4 in this series).

Clean up

To clean up your resources, complete the following steps:

  1. Delete the AWS Glue job.
  2. Delete the CloudFormation stack.

Conclusion

AWS Glue Data Quality offers a straightforward way to measure and monitor the data quality of your ETL pipeline. In this post, you learned about authoring a Data Quality job with dynamic rules, and how these rules eliminate the need to update static rules with ever-evolving source data in order to keep the rules current. Data Quality dynamic rules enable the detection of potential data quality issues early in the data ingestion process, before downstream propagation into data lakes, warehouses, and analytical engines. By catching errors upfront, organizations can ingest cleaner data and take advantage of advanced data quality capabilities. The rules provide a robust framework to identify anomalies, validate integrity, and provide accuracy as data enters the analytics pipeline. Overall, AWS Glue dynamic rules empower organizations to take control of data quality at scale and build trust in analytical outputs.

To learn more about AWS Glue Data Quality, refer to the following:


About the Authors

Prasad Nadig is an Analytics Specialist Solutions Architect at AWS. He guides customers architect optimal data and analytical platforms leveraging the scalability and agility of the cloud. He is passionate about understanding emerging challenges and guiding customers to build modern solutions. Outside of work, Prasad indulges his creative curiosity through photography, while also staying up-to-date on the latest technology innovations and trends.

Mahammadali Saheb is a Data Architect at AWS Professional Services, specializing in Data Analytics. He is passionate about helping customers drive business outcome via data analytics solutions on AWS Cloud.

Tyler McDaniel is a software development engineer on the AWS Glue team with diverse technical interests including high-performance computing and optimization, distributed systems, and machine learning operations. He has eight years of experience in software and research roles.

Rahul Sharma is a Senior Software Development Engineer at AWS Glue. He focuses on building distributed systems to support features in AWS Glue. He has a passion for helping customers build data management solutions on the AWS Cloud. In his spare time, he enjoys playing the piano and gardening.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Entity resolution and fuzzy matches in AWS Glue using the Zingg open source library

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/entity-resolution-and-fuzzy-matches-in-aws-glue-using-the-zingg-open-source-library/

In today’s data-driven world, organizations often deal with data from multiple sources, leading to challenges in data integration and governance. AWS Glue, a serverless data integration service, simplifies the process of discovering, preparing, moving, and integrating data for analytics, machine learning (ML), and application development.

One critical aspect of data governance is entity resolution, which involves linking data from different sources that represent the same entity, despite not being exactly identical. This process is crucial for maintaining data integrity and avoiding duplication that could skew analytics and insights.

AWS Glue is based on the Apache Spark framework, and offers the flexibility to extend its capabilities through third-party Spark libraries. One such powerful open source library is Zingg, an ML-based tool, specifically designed for entity resolution on Spark.

In this post, we explore how to use Zingg’s entity resolution capabilities within an AWS Glue notebook, which you can later run as an extract, transform, and load (ETL) job. By integrating Zingg in your notebooks or ETL jobs, you can effectively address data governance challenges and provide consistent and accurate data across your organization.

Solution overview

The use case is the same as that in Integrate and deduplicate datasets using AWS Lake Formation FindMatches.

It consists of a dataset of publications, which has many duplicates because the titles, names, descriptions, or other attributes are slightly different. This often happens when collating information from different sources.

In this post, we use the same dataset and training labels but show how to do it with a third-party entity resolution like the Zingg ML library.

Prerequisites

To follow this post, you need the following:

Set up the required files

To run the notebook (or later to run as a job), you need to set up the Zingg library and configuration. Complete the following steps:

  1. Download the Zingg distribution package for AWS Glue 4.0, which uses Spark 3.3.0. The appropriate release is Zingg 0.3.4.
  2. Extract the JAR file zingg-0.3.4-SNAPSHOT.jar inside the tar and upload it to the base of your S3 bucket.
  3. Create a text file named config.json and enter the following content, providing the name of your S3 bucket in the places indicated, and upload the file to the base of your bucket:
{
    "fieldDefinition":[
            {
                    "fieldName" : "title",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "authors",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "venue",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"string\""
            },
            {
                    "fieldName" : "year",
                    "matchType" : "fuzzy",
                    "fields" : "fname",
                    "dataType": "\"double\""
            }
    ],
    "output" : [{
            "name":"output",
            "format":"csv",
            "props": {
                    "location": "s3://<your bucket name>/matchOuput/",
                    "delimiter": ",",
                    "header":true
            }
    }],
    "data" : [{
            "name":"dblp-scholar",
            "format":"json",
            "props": {
                    "location": "s3://ml-transforms-public-datasets-us-east-1/dblp-scholar/records/dblp_scholar_records.jsonl"
            },
            "schema":
                    "{\"type\" : \"struct\",
                    \"fields\" : [
                            {\"name\":\"id\", \"type\":\"string\", \"nullable\":false},
                            {\"name\":\"title\", \"type\":\"string\", \"nullable\":true},
                            {\"name\":\"authors\",\"type\":\"string\",\"nullable\":true} ,
                            {\"name\":\"venue\", \"type\":\"string\", \"nullable\":true},
                            {\"name\":\"year\", \"type\":\"double\", \"nullable\":true},
                            {\"name\":\"source\",\"type\":\"string\",\"nullable\":true}
                    ]
            }"
    }],
    "numPartitions":4,
    "modelId": 1,
    "zinggDir": "s3://<your bucket name>/models"
}

You can also define the configuration programmatically, but using JSON makes it more straightforward to visualize and allows you to use it in the Zingg command line tool. Refer to the library documentation for further details.

Set up the AWG Glue notebook

For simplicity, we use an AWS Glue notebook to prepare the training data, build a model, and find matches. Complete the following steps to set up the notebook with the Zingg libraries and config files that you prepared:

  1. On the AWS Glue console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. Leave the default options and choose a role suitable for notebooks.
  4. Add a new cell to use for Zingg-specific configuration and enter the following content, providing the name of your bucket:

%extra_jars s3://<your bucket>/zingg-0.3.4-SNAPSHOT.jar
%extra_py_files s3://<your bucket>/config.json
%additional_python_modules zingg==0.3.4

notebook setup cell

  1. Run the configuration cell. It’s important that this is done before running any other cell because the configuration changes won’t apply if the session is already started. If that happens, create and run a cell with the content %stop_session. This will stop the session but not the notebook, so when you run a cell will code, it will start a new one, using all the configuration settings you have defined at that moment.
    Now the notebook is ready to start the session.
  1. Create a session using the setup cell provided (labeled: “Run this cell to set up and start your interactive session”).
    After a few seconds, you should get a message indicating the session has been created.

Prepare the training data

Zingg enables providing sample training pairs as well as interactively defining them by an expert; in the latter, the algorithm finds examples that it considers meaningful and asks an expert if it’s a match, if it’s not, or if the expert can’t decide. The algorithm can work with a few samples of matches and non-matches, but the larger the training data, the better.

In this example, we reuse the labels provided in the original post, which assigns the samples to groups of rows (called clusters) instead of labeling individual pairs. Because we need to transform that data, we can convert it to the format that Zingg uses internally, so we skip having to configure the training samples definition and format. To learn more about the configuration that would be required, refer to Using pre-existing training data.

  1. In the notebook with the session started, add a new cell and enter the following code, providing the name of your own bucket:
bucket_name = "<your bucket name>"

spark.read.csv(
    "s3://ml-transforms-public-datasets-us-east-1/dblp-scholar/labels/dblp_scholar_labels_350.csv"
    , header=True).createOrReplaceTempView("labeled")

spark.sql("""
SELECT book.id as z_zid, "sample" as z_source, z_cluster, z_isMatch,
           book.title, book.authors, book.venue, CAST(book.year AS DOUBLE) as year, book.source
FROM(
    SELECT explode(pair) as book, *
    FROM(
        SELECT (a.label == b.label) as z_isMatch, array(struct(a.*), 
               struct(b.*)) as pair, uuid() as z_cluster
        FROM labeled a, labeled b 
        WHERE a.labeling_set_id = b.labeling_set_id AND a.id != b.id
))
""").write.mode("overwrite").parquet(f"s3://{bucket_name}/models/1/trainingData/marked/")
print("Labeled data ready")
  1. Run the new cell. After a few seconds, it will print the message indicating the labeled data is ready.

Build the model and find matches

Create and run a new cell with the following content:

sc._jsc.hadoopConfiguration().set('fs.defaultFS', f's3://{bucket_name}/')
sc._jsc.hadoopConfiguration().set('mapred.output.committer.class', "org.apache.hadoop.mapred.FileOutputCommitter")

from zingg.client import Arguments, ClientOptions, FieldDefinition, Zingg
zopts = ClientOptions(["--phase", "trainMatch",  "--conf", "/tmp/config.json"])
zargs = Arguments.createArgumentsFromJSON(zopts.getConf(), zopts.getPhase())
zingg = Zingg(zargs, zopts)
zingg.init()
zingg.execute()

Because it’s doing both training and matching, it will take a few minutes to complete. When it’s complete, the cell will print the options used.

If there is an error, the information returned to the notebook might not be enough to troubleshoot, in which case you can use Amazon CloudWatch. On the CloudWatch console, choose Log Groups in the navigation pane, then under /aws-glue/sessions/error, find the driver log using the timestamp or the session ID (the driver is the one with just the ID without any suffix).

Explore the matches found by the algorithm

As per the Zingg configuration, the previous step produced a CSV file with the matches found on the original JSON data. Create and run a new cell with the following content to visualize the matches file:

from pyspark.sql.functions import col
spark.read.csv(f"s3://{bucket_name}/matchOuput/", header=True) \
    .withColumn("z_cluster", col("z_cluster").cast('int')) \
    .drop("z_minScore", "z_maxScore") \
    .sort(col("z_cluster")).show(100, False)

It will display the first 100 rows with clusters assigned. If the cluster assigned is the same, then the publications are considered duplicates.

Athena results

For instance, in the preceding screenshot, clusters 0 or 20 are spelling variations of the same title, with some incomplete or incorrect data in other fields. The publications appear as duplicates in these cases.

As in the original post with FindMatches, it struggles with editor’s notes and cluster 12 has more questionable duplicates, where the title and venue are similar, but the completely different authors suggest it’s not a duplicate and the algorithm needs more training with examples like this.

You can also run the notebook as a job, either choosing Run or programmatically, in which case you want to remove the cell you created earlier to explore the output, as well as any other cells that are not needed to do the entity resolution, such as the sample cells provided when you created the notebook.

Additional considerations

As part of the notebook setup, you created a configuration cell with three configuration magics. You could replace these with the ones in the setup cell provided, as long as they are listed before any Python code.

One of them specifies the Zingg configuration JSON file as an extra Python file, even though it’s not really a Python file. This is so it gets deployed on the cluster under the /tmp directory and it’s accessible by the library. You could also specify the Zingg configuration programmatically using the library’s API, and not require the config file.

In the cell that builds and runs the model, there are two lines that adjust the Hadoop configuration. This is required because the library was designed to run on HDFS instead of Amazon S3. The first one configures the default file system to use the S3 bucket, so when it needs to produce temporary files, they are written there. The second one restores the default committer instead of the direct one that AWS Glue configures out of the box.

The Zingg library is invoked with the phase trainMatch. This is a shortcut to do both the train and match phases in one call. It works the same as when you invoke a phase in the Zingg command line that is often used as an example in the Zingg documentation.

If you want to do incremental matches, you could run a match on the new data and then a linking phase between the main data and the new data. For more information, see Linking across datasets.

Clean up

When you navigate away from the notebook, the interactive session should be stopped. You can verify it was stopped on the AWS Glue console by choosing Interactive Sessions in the navigation pane and then sorting by status, to check if any are running and therefore generating charges. You can also delete the files in the S3 bucket if you don’t intend to use them.

Conclusion

In this post, we showed how you can incorporate a third-party Apache Spark library to extend the capabilities of AWS Glue and give you the freedom of choice. You can use your own data in the same way, and then integrate this entity resolution as part of a workflow using a tool such as Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

If you have any questions, please leave them in the comments.


About the Authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team, with a background in machine learning and AI.

Emilio Garcia Montano is a Solutions Architect at Amazon Web Services. He works with media and entertainment customers and supports them to achieve their outcomes with machine learning and AI.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Understanding Apache Iceberg on AWS with the new technical guide

Post Syndicated from Carlos Rodrigues original https://aws.amazon.com/blogs/big-data/understanding-apache-iceberg-on-aws-with-the-new-technical-guide/

We’re excited to announce the launch of the Apache Iceberg on AWS technical guide. Whether you are new to Apache Iceberg on AWS or already running production workloads on AWS, this comprehensive technical guide offers detailed guidance on foundational concepts to advanced optimizations to build your transactional data lake with Apache Iceberg on AWS.

Apache Iceberg is an open source table format that simplifies data processing on large datasets stored in data lakes. It does so by bringing the familiarity of SQL tables to big data and capabilities such as ACID transactions, row-level operations (merge, update, delete), partition evolution, data versioning, incremental processing, and advanced query scanning. Apache Iceberg seamlessly integrates with popular open source big data processing frameworks like Apache Spark, Apache Hive, Apache Flink, Presto, and Trino. It is natively supported by AWS analytics services such as AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift.

The following diagram illustrates a reference architecture of a transactional data lake with Apache Iceberg on AWS.

AWS customers and data engineers use the Apache Iceberg table format for its many benefits, as well as for its high performance and reliability at scale to build transactional data lakes and write-optimized solutions with Amazon EMR, AWS Glue, Athena, and Amazon Redshift on Amazon Simple Storage Service (Amazon S3).

We believe Apache Iceberg adoption on AWS will continue to grow rapidly, and you can benefit from this technical guide that delivers productive guidance on working with Apache Iceberg on supported AWS services, best practices on cost-optimization and performance, and effective monitoring and maintenance policies.

Related resources


About the Authors

Carlos Rodrigues is a Big Data Specialist Solutions Architect at AWS. He helps customers worldwide build transactional data lakes on AWS using open table formats like Apache Iceberg and Apache Hudi. He can be reached via LinkedIn.

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He is an expert on data engineering and enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Shana Schipers is an Analytics Specialist Solutions Architect at AWS, focusing on big data. She supports customers worldwide in building transactional data lakes using open table formats like Apache Hudi, Apache Iceberg, and Delta Lake on AWS.

Use AWS Glue Data Catalog views to analyze data

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/use-aws-glue-data-catalog-views-to-analyze-data/

In this post, we show you how to use the new views feature the AWS Glue Data Catalog. SQL views are a powerful object used across relational databases. You can use views to decrease the time to insights of data by tailoring the data that is queried. Additionally, you can use the power of SQL in a view to express complex boundaries in data across multiple tables that can’t be expressed with simpler permissions. Data lakes provide customers the flexibility required to derive useful insights from data across many sources and many use cases. Data consumers can consume data where they need to across lines of business, increasing the velocity of insights generation.

Customers use many different processing engines in their data lakes, each of which have their own version of views with different capabilities. The AWS Glue Data Catalog and AWS Lake Formation provide a central location to manage your data across data lake engines.

AWS Glue has released a new feature, SQL views, which allows you to manage a single view object in the Data Catalog that can be queried from SQL engines. You can create a single view object with a different SQL version for each engine you want to query, such as Amazon Athena, Amazon Redshift, and Spark SQL on Amazon EMR. You can then manage access to these resources using the same Lake Formation permissions that are used to control tables in the data lake.

Solution overview

For this post, we use the Women’s E-Commerce Clothing Review. The objective is to create views in the Data Catalog so you can create a single common view schema and metadata object to use across engines (in this case, Athena). Doing so lets you use the same views across your data lakes to fit your use case. We create a view to mask the customer_id column in this dataset, then we will share this view to another user so that they can query this masked view.

Prerequisites

Before you can create a view in the AWS Glue Data Catalog, make sure that you have an AWS Identity and Access Management (IAM) role with the following configuration:

  • The following trust policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
               "glue.amazonaws.com",
               "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

  • The following pass role policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Stmt1",
          "Action": [
            "iam:PassRole"
          ],
          "Effect": "Allow",
          "Resource": "*",
          "Condition": {
             "StringEquals": {
               "iam:PassedToService": [
                 "glue.amazonaws.com",
                 "lakeformation.amazonaws.com"
               ]
             }
           }
         }
       ]
    }

  • Finally, you will also need the following permissions:
    • "Glue:GetDatabase",
    • "Glue:GetDatabases",
    • "Glue:CreateTable",
    • "Glue:GetTable",
    • "Glue:UpdateTable",
    • "Glue:DeleteTable",
    • "Glue:GetTables",
    • "Glue:SearchTables",
    • "Glue:BatchGetPartition",
    • "Glue:GetPartitions",
    • "Glue:GetPartition",
    • "Glue:GetTableVersion",
    • "Glue:GetTableVersions"

Run the AWS CloudFormation template

You can deploy the AWS CloudFormation template glueviewsblog.yaml to create the Lake Formation database and table. The dataset will be loaded into an Amazon Simple Storage Service (Amazon S3) bucket.

For step-by-step instructions, refer to Creating a stack on the AWS CloudFormation console.

When the stack is complete, you can see a table called clothing_parquet on the Lake Formation console, as shown in the following screenshot.

Create a view on the Athena console

Now that you have your Lake Formation managed table, you can open the Athena console and create a Data Catalog view. Complete the following steps:

  1. In the Athena query editor, run the following query on the Parquet dataset:
SELECT * FROM "clothing_reviews"."clothing_parquet" limit 10;

In the query results, the customer_id column is currently visible.

Next, you create a view called hidden_customerID and mask the customer_id column.

  1. Create a view called hidden_customerID:
CREATE PROTECTED MULTI DIALECT VIEW clothing_reviews.hidden_customerid SECURITY DEFINER AS 
SELECT * FROM clothing_reviews.clothing_parquet

In the following screenshot, you can see a view called hidden_customerID was successfully created.

  1. Run the following query to mask the first four characters of the customer_id column for the newly generated view:
ALTER VIEW clothing_reviews.hidden_customerid UPDATE DIALECT AS
SELECT '****' || substring(customer_id, 4) as customer_id,clothing_id,age,title,review_text,rating,recommend_ind,positive_feedback,division_name,department_name,class_name 
FROM clothing_reviews.clothing_parquet

You can see in the following screenshot that the view hidden_customerID has the customer_id column’s first four characters masked.

The original table clothing_parquet remains the same unmasked.

Grant access of the view to another user to query

Data Catalog views allow you to use Lake Formation to control access. In this step, you grant this view to another user called amazon_business_analyst and then query from that user.

  1. Sign in to the Lake Formation console as admin.
  2. In the navigation pane, choose Views.

As shown in the following screenshot, you can see the hidden_customerid view.

  1. Sign in as the amazon_business_analyst user and navigate to the Views page.

This user has no visibility to the view.

  1. Grant permission to the amazon_business_analyst user from the data lake admin.
  1. Sign in again as amazon_business_analyst and navigate to the Views page.

  1. On the Athena console, query the hidden_customerid view.

You have successfully shared a view to the user and queried it from the Athena console.

Clean up

To avoid incurring future charges, delete the CloudFormation stack. For instructions, refer to Deleting a stack on the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to use the AWS Glue Data Catalog to create views. We then showed how to alter the views and mask the data. You can share the view with different users to query using Athena. For more information about this new feature, refer to Using AWS Glue Data Catalog views.


About the Authors

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Michael Chess – is a Product Manager on the AWS Lake Formation team based out of Palo Alto, CA. He specializes in permissions and data catalog features in the data lake.

Derek Liu – is a Senior Solutions Architect based out of Vancouver, BC. He enjoys helping customers solve big data challenges through AWS analytic services.

Detect and handle data skew on AWS Glue

Post Syndicated from Salim Tutuncu original https://aws.amazon.com/blogs/big-data/detect-and-handle-data-skew-on-aws-glue/

AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray).

Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns.

For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution.

Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew.

How to detect data skew

When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI.

The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance.

You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew.

Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew.

These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew.

AWS Glue interactive sessions

You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew:

from pyspark.sql.functions import spark_partition_id, asc, desc
#input_dataframe being the dataframe where you want to check for data skew
partition_sizes_df=input_dataframe\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .withColumnRenamed("count","partition_size")
#calculate average and standar deviation for the partition sizes
avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0]
std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0]

""" 
 the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size).
 then, calculates twice the standard deviation (std_dev_size) and use 
 that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation
 in order to mark a partition 'skewed'
"""
skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size)
if skewed_partitions_df.count() > 0:
    skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()]
    print(f"The following partitions have significantly different sizes: {skewed_partitions}")
else:
    print("No data skew detected.")

You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected.

This code assumes that your data is structured, and you may need to modify it if your data is of a different type.

How to handle data skew

You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present.

The following are some of the techniques that you can employ to handle data skew:

  • Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately.
  • Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data.
  • Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition.
  • Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes.
  • Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values.

These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing.

The following is an example of joining skewed data with the salting technique:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, ceil, rand, concat, col

# Define the number of salt values
num_salts = 3

# Function to identify skewed keys
def identify_skewed_keys(df, key_column, threshold):
    key_counts = df.groupBy(key_column).count()
    return key_counts.filter(key_counts['count'] > threshold).select(key_column)

# Identify skewed keys
skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold)

# Splitting the dataset
skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner")
non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti")

# Apply salting to skewed data
skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts))
skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

# Replicate skewed rows in non-skewed dataset
def replicate_skewed_rows(df, keys, multiplier):
    replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt"))
    replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    return replicated_df.drop("salt")

replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts)

# Perform the JOIN operation on the salted keys for skewed data
result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key")

# Perform regular join on non-skewed data
result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key")

# Combine results
final_result = result_skewed.union(result_non_skewed)

In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew.

The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed.

The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values.

Other techniques to use on skewed data during the join operation

When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data.

If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark.

Conclusion

This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark.

Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark.

As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section.


About the Authors

Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI.

How Fujitsu implemented a global data mesh architecture and democratized data

Post Syndicated from Kanehito Miyake original https://aws.amazon.com/blogs/big-data/how-fujitsu-implemented-a-global-data-mesh-architecture-and-democratized-data/

This is a guest post co-authored with Kanehito Miyake, Engineer at Fujitsu Japan. 

Fujitsu Limited was established in Japan in 1935. Currently, we have approximately 120,000 employees worldwide (as of March 2023), including group companies. We develop business in various regions around the world, starting with Japan, and provide digital services globally. To provide a variety of products, services, and solutions that are better suited to customers and society in each region, we have built business processes and systems that are optimized for each region and its market.

However, in recent years, the IT market environment has changed drastically, and it has become difficult for the entire group to respond flexibly to the individual market situation. Moreover, we are challenged not only to revisit individual products, services, and solutions, but also to reinvent entire business processes and operations.

To transform Fujitsu from an IT company to a digital transformation (DX) company, and to become a world-leading DX partner, Fujitsu has declared a shift to data-driven management. We built the OneFujitsu program, which standardizes business projects and systems throughout the company, including the domestic and overseas group companies, and tackles the major transformation of the entire company under the program.

To achieve data-driven management, we built OneData, a data utilization platform used in the four global AWS Regions, which started operation in April 2022. As of November 2023, more than 200 projects and 37,000 users were onboarded. The platform consists of approximately 370 dashboards, 360 tables registered in the data catalog, and 40 linked systems. The data size stored in Amazon Simple Storage Service (Amazon S3) exceeds 100 TB, including data processed for use in each project.

In this post, we introduce our OneData initiative. We explain how Fujitsu worked to solve the aforementioned issues and introduce an overview of the OneData design concept and its implementation. We hope this post will provide some guidance for architects and engineers.

Challenges

Like many other companies struggling with data utilization, Fujitsu faced some challenges, which we discuss in this section.

Siloed data

In Fujitsu’s long history, we restructured organizations by merging affiliated companies into Fujitsu. Although organizational integration has progressed, there are still many systems and mechanisms customized for individual context. There are also many systems and mechanisms overlapping across different organizations. For this reason, it takes a lot of time and effort to discover, search, and integrate data when analyzing the entire company using a common standard. This situation makes it difficult for management to grasp business trends and make decisions in a timely manner.

Under these circumstances, the OneFujitsu program is designed have one system per one business globally. Core systems such as ERP and CRM are being integrated and unified in order to not have silos. It will make it easier for users to utilize data across different organizations for specific business areas.

However, to spread a culture of data-driven decision-making not only in management but also in every organization, it is necessary to have a mechanism that enables users to easily discover various types of data in organizations, and then analyze the data quickly and flexibly when needed.

Excel-based data utilization

Microsoft Excel is available on almost everyone’s PC in the company, and it helps lower the hurdles when starting to utilize data. However, Excel is mainly designed for spreadsheets; it’s not designed for large-scale data analytics and automation. Excel files tend to contain a mixture of data and procedures (functions, macros), and many users casually copy files for one-time use cases. It introduces complexity to keep both data and procedures up to date. Furthermore, it tends to require domain-specific knowledge to manage the Excel files for individual context.

For those reasons, it was extremely difficult for Fujitsu to manage and utilize data at scale with Excel.

Solution overview

OneData defines three personas:

  • Publisher – This role includes the organizational and management team of systems that serve as data sources. Responsibilities include:
    • Load raw data from the data source system at the appropriate frequency.
    • Provide and keep up to date with technical metadata for loaded data.
    • Perform the cleansing process and format conversion of raw data as needed.
    • Grant access permissions to data based on the requests from data users.
  • Consumer – Consumers are organizations and projects that use the data. Responsibilities include:
    • Look for the data to be used from the technical data catalog and request access to the data.
    • Handle the process and conversion of data into a format suitable for their own use (such as fact-dimension) with granted referencing permissions.
    • Configure business intelligence (BI) dashboards to provide data-driven insights to end-users targeted by the consumer’s project.
    • Use the latest data published by the publisher to update data as needed.
    • Promote and expand the use of databases.
  • Foundation – This role encompasses the data steward and governance team. Responsibilities include:
    • Provide a preprocessed, generic dataset of data commonly used by many consumers.
    • Manage and guide metrics for the quality of data published by each publisher.

Each role has sub-roles. For example, the consumer role has the following sub-roles with different responsibilities:

  • Data engineer – Create data process for analysis
  • Dashboard developer – Create a BI dashboard
  • Dashboard viewer – Monitor the BI dashboard

The following diagram describes how OneData platform works with those roles.

Let’s look at the key components of this architecture in more detail.

Publisher and consumer

In the OneData platform, the publisher is per each data source system, and the consumer is defined per each data utilization project. OneData provides an AWS account for each.

This enables the publisher to cleanse data and the consumer to process and analyze data at scale. In addition, by properly separating data and processing, it becomes effortless for the teams and organizations to share, manage, and inherit processes that were traditionally confined to individual PCs.

Foundation

When the teams don’t have a robust enough skillset, it can require more time to model and process data, and cause longer latency and lower data quality. It can also contribute to lower utilization by end-users. To address this, the foundation role provides an already processed dataset as a generic data model for data commonly use cases used by many consumers. This enables high-quality data available to each consumer. Here, the foundation role takes the lead in compiling the knowledge of domain experts and making data suitable for analysis. It is also an effective approach that eliminates duplicates for consumers. In addition, the foundation role monitors the state of the metadata, data quality indicators, data permissions, information classification labels, and so on. It is crucial in data governance and data management.

BI and visualization

Individual consumers have a dedicated space in a BI tool. In the past, if users wanted to go beyond simple data visualization using Excel, they had to build and maintain their own BI tools, which caused silos. By unifying these BI tools, OneData lowers the difficulty for consumers to use BI tools, and centralizes operation and maintenance, achieving optimization on a company-wide scale.

Additionally, to keep portability between BI tools, OneData recommends users transform data within the consumer AWS account instead of transforming data in the BI tool. With this approach, BI tool loads data from AWS Glue Data Catalog tables through an Amazon Athena JDBC/ODBC driver without any further transformations.

Deployment and operational excellence

To provide OneData as a common service for Fujitsu and group companies around the world, Regional OneData has been deployed in several locations. Regional OneData represents a unit of system configurations, and is designed to provide lower network latency for platform users, and be optimized for local languages, working hours for system operations and support, and region-specific legal restrictions, such as data residency and personal information protection.

The Regional Operations Unit (ROU), a virtual organization that brings together members from each region, is responsible for operating regional OneData in each of these regions. OneData HQ is responsible for supervising these ROUs, as well as planning and managing the entire OneData.

In addition, we have a specially positioned OneData called Global OneData, where global data utilization spans each region. Only the properly cleansed and sanitized data is transferred between each Regional OneData and Global OneData.

Systems such as ERP and CRM are accumulating data as a publisher for Global OneData, and the dashboards for executives in various regions to monitor business conditions with global metrics are also acting as a consumer for Global OneData.

Technical concepts

In this section, we discuss some of the technical concepts of the solution.

Large scale multi-account

We have adopted a multi-account strategy to provide AWS accounts for each project. Many publishers and consumers are already onboarded into OneData, and the number is expected to increase in the future. With this strategy, future usage expansion at scale can be achieved without affecting the users.

Also, this strategy allowed us to have clear boundaries in security, costs, and service quotas for each AWS service.

All the AWS accounts are deployed and managed through AWS Organizations and AWS Control Tower.

Serverless

Although we provide independent AWS accounts for each publisher and consumer, both operational costs and resource costs would be enormous if we accommodated individual user requests, such as, “I want a virtual machine or RDBMS to run specific tools for data processing.” To avoid such continuous operational and resource costs, we have adopted AWS serverless services for all the computing resources necessary for our activities as a publisher and consumer.

We use AWS Glue to preprocess, cleanse, and enrich data. Optionally, AWS Lambda or Amazon Elastic Container Service (Amazon ECS) with AWS Fargate can also be used based on preferences. We allow users to set up AWS Step Functions for orchestration and Amazon CloudWatch for monitoring. In addition, we provide Amazon Aurora Serverless PostgreSQL as standard for consumers, to meet their needs for data processing with extract, load, and transform (ELT) jobs. With this approach, only the consumer who requires those services will incur charges based on usage. We are able to take advantage of lower operational and resource costs thanks to the unique benefit of serverless (or more accurately, pay-as-you-go) services.

AWS provides many serverless services, and OneData has integrated them to provide scalability that allows active users to quickly provide the required capability as needed, while minimizing the cost for non-frequent users.

Data ownership and access control

In OneData, we have adopted a data mesh architecture where each publisher maintains ownership of data in a distributed and decentralized manner. When the consumer discovers the data they want to use, they request access from the publisher. The publisher accepts the request and grants permissions only when the request meets their own criteria. With the AWS Glue Data Catalog and AWS Lake Formation, there is no need to update S3 bucket policies or AWS Identity and Access Management (IAM) policies every time we allow access for individual data on an S3 data lake, and we can effortlessly grant the necessary permissions for the databases, tables, columns, and rows when needed.

Conclusion

Since the launch of OneData in April 2022, we have been persistently carrying out educational activities to expand the number of users and introducing success stories on our portal site. As a result, we have been promoting change management within the company and are actively utilizing data in each department. Regional OneData is being rolled out gradually, and we plan to further expand the scale of use in the future.

With its global expansion, the development of basic functions as a data utilization platform will reach a milestone. As we move forward, it will be important to make sure that OneData platform is used effectively throughout Fujitsu, while incorporating new technologies related to data analysis as appropriate. For example, we are preparing to provide more advanced machine learning functions using Amazon SageMaker Studio with OneData users and investigating the applicability of AWS Glue Data Quality to reduce the manual quality monitoring efforts. Furthermore, we are currently in the process of implementing Amazon DataZone through various initiatives and efforts, such as verifying its functionality and examining how it can operate while bridging the gap between OneData’s existing processes and to the ideal process we are aiming for ideals.

We have had the opportunity to discuss data utilization with various partners and customers and although individual challenges may differ in size and its context, the issues that we are currently trying to solve with OneData are common to many of them.

This post describes only a small portion of how Fujitsu tackled challenges using the AWS Cloud, but we hope the post will give you some inspiration to solve your own challenges.


About the Author


Kanehito Miyake is an engineer at Fujitsu Japan and in charge of OneData’s solution and cloud architecture. He spearheaded the architectural study of the OneData project and contributed greatly to promoting data utilization at Fujitsu with his expertise. He loves rockfish fishing.

Junpei Ozono is a Go-to-market Data & AI solutions architect at AWS in Japan. Junpei supports customers’ journeys on the AWS Cloud from Data & AI aspects and guides them to design and develop data-driven architectures powered by AWS services.

Introducing Amazon Q data integration in AWS Glue

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-amazon-q-data-integration-in-aws-glue/

Today, we’re excited to announce general availability of Amazon Q data integration in AWS Glue. Amazon Q data integration, a new generative AI-powered capability of Amazon Q Developer, enables you to build data integration pipelines using natural language. This reduces the time and effort you need to learn, build, and run data integration jobs using AWS Glue data integration engines.

Tell Amazon Q Developer what you need in English, it will return a complete job for you. For example, you can ask Amazon Q Developer to generate a complete extract, transform, and load (ETL) script or code snippet for individual ETL operations. You can troubleshoot your jobs by asking Amazon Q Developer to explain errors and propose solutions. Amazon Q Developer provides detailed guidance throughout the entire data integration workflow. Amazon Q Developer helps you learn and build data integration jobs using AWS Glue efficiently by generating the required AWS Glue code based on your natural language descriptions. You can create jobs that extract, transform, and load data that is stored in Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon DynamoDB. Amazon Q Developer can also help you connect to third-party, software as a service (SaaS), and custom sources.

With general availability, we added new capabilities for you to author jobs using natural language. Amazon Q Developer can now generate complex data integration jobs with multiple sources, destinations, and data transformations. It can generate data integration jobs for extracts and loads to S3 data lakes including file formats like CSV, JSON, and Parquet, and ingestion into open table formats like Apache Hudi, Delta, and Apache Iceberg. It generates jobs for connecting to over 20 data sources, including relational databases like PostgreSQL, MySQL and Oracle; data warehouses like Amazon Redshift, Snowflake, and Google BigQuery; NoSQL databases like DynamoDB, MongoDB and OpenSearch; tables defined in the AWS Glue Data Catalog; and custom user-supplied JDBC and Spark connectors. Generated jobs can use a variety of data transformations, including filter, project, union, join, and custom user-supplied SQL.

Amazon Q data integration in AWS Glue helps you through two different experiences: the Amazon Q chat experience, and AWS Glue Studio notebook experience. This post describes the end-to-end user experiences to demonstrate how Amazon Q data integration in AWS Glue simplifies your data integration and data engineering tasks.

Amazon Q chat experience

Amazon Q Developer provides a conversational Q&A capability and a code generation capability for data integration. To start using the conversational Q&A capability, choose the Amazon Q icon on the right side of the AWS Management Console.

For example, you can ask, “How do I use AWS Glue for my ETL workloads?” and Amazon Q provides concise explanations along with references you can use to follow up on your questions and validate the guidance.

To start using the AWS Glue code generation capability, use the same window. On the AWS Glue console, start authoring a new job, and ask Amazon Q, “Please provide a Glue script that reads from Snowflake, renames the fields, and writes to Redshift.”

You will notice that the code is generated. With this response, you can learn and understand how you can author AWS Glue code for your purpose. You can copy/paste the generated code to the script editor and configure placeholders. After you configure an AWS Identity and Access Management (IAM) role and AWS Glue connections on the job, save and run the job. When the job is complete, you can start querying the table exported from Snowflake in Amazon Redshift.

Let’s try another prompt that reads data from two different sources, filters and projects them individually, joins on a common key, and writes the output to a third target.  Ask Amazon Q: “I want to read data from S3 in Parquet format, and select some fields. I also want to read data from DynamoDB, select some fields, and filter some rows. I want to union these two datasets and write the results to OpenSearch.

The code is generated. When the job is complete, your index is available in OpenSearch and can be used by your downstream workloads.

AWS Glue Studio notebook experience

Amazon Q data integration in AWS Glue helps you author code in an AWS Glue notebook to speed up development of new data integration applications. In this section, we walk you through how to set up the notebook and run a notebook job.

Prerequisites

Before going forward with this tutorial, complete the following prerequisites:

  1. Set up AWS Glue Studio.
  2. Configure an IAM role to interact with Amazon Q. Attach the following policy to your IAM role for the AWS Glue Studio notebook:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CodeWhispererPermissions",
                "Effect": "Allow",
                "Action": [
                    "codewhisperer:GenerateRecommendations"
                ],
                "Resource": "*"
            }
        ]
    }

Create a new AWS Glue Studio notebook job

Create a new AWS Glue Studio notebook job by completing the following steps:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.
  3. For Engine, select Spark (Python).
  4. For Options, select Start fresh.
  5. For IAM role, choose the IAM role you configured as a prerequisite.
  6. Choose Create notebook.

A new notebook is created with sample cells. Let’s try recommendations using the Amazon Q data integration in AWS Glue to auto-generate code based on your intent. Amazon Q would help you with each step as you express an intent in a Notebook cell.

Add a new cell and enter your comment to describe what you want to achieve. After you press Tab and Enter, the recommended code is shown. First intent is to extract the data: “Give me code that reads a Glue Data Catalog table”, followed by “Give me code to apply a filter transform with star_rating>3” and “Give me code that writes the frame into S3 as Parquet”.

Similar to the Amazon Q chat experience, the code is recommended. If you press Tab, then the recommended code is chosen. You can learn more in User actions.

You can run each cell by simply filling in the appropriate options for your sources in the generated code. At any point in the runs, you can also preview a sample of your dataset by simply using the show() method.

Let’s now try to generate a full script with a single complex prompt. “I have JSON data in S3 and data in Oracle that needs combining. Please provide a Glue script that reads from both sources, does a join, and then writes results to Redshift”

You may notice that, on the notebook, the Amazon Q data integration in AWS Glue generated the same code snippet that was generated in the Amazon Q chat.

You can also run the notebook as a job, either by choosing Run or programmatically.

Conclusion

With Amazon Q data integration, you have an artificial intelligence (AI) expert by your side to integrate data efficiently without deep data engineering expertise. These capabilities simplify and accelerate data processing and integration on AWS. Amazon Q data integration in AWS Glue is available in every AWS Region where Amazon Q is available. To learn more, visit the product page, our documentation, and the Amazon Q pricing page.

A special thanks to everyone who contributed to the launch of Amazon Q data integration in AWS Glue: Alexandra Tello, Divya Gaitonde, Andrew Kim, Andrew King, Anshul Sharma, Anshi Shrivastava, Chuhan Liu, Daniel Obi, Hirva Patel, Henry Caballero Corzo, Jake Zych, Jeremy Samuel, Jessica Cheng, , Keerthi Chadalavada, Layth Yassin, Maheedhar Reddy Chappidi, Maya Patwardhan, Neil Gupta, Raghavendhar Vidyasagar Thiruvoipadi, Rajendra Gujja, Rupak Ravi, Shaoying Dong, Vaibhav Naik, Wei Tang, William Jones, Daiyan Alamgir, Japson Jeyasekaran, Matt Sampson, Kartik Panjabi, Ranu Shah, Chuan Lei, Huzefa Rangwala, Jiani Zhang, Xiao Qin, Mukul Prasad, Alon Halevy, Brian Ross, Alona Nadler, Omer Zaki, Rick Sears, Bratin Saha, G2 Krishnamoorthy, Kinshuk Pahare, Nitin Bahadur, and Santosh Chandrachood.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.


Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends.


Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.


XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.


Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS.


Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

Optimize data layout by bucketing with Amazon Athena and AWS Glue to accelerate downstream queries

Post Syndicated from Takeshi Nakatani original https://aws.amazon.com/blogs/big-data/optimize-data-layout-by-bucketing-with-amazon-athena-and-aws-glue-to-accelerate-downstream-queries/

In the era of data, organizations are increasingly using data lakes to store and analyze vast amounts of structured and unstructured data. Data lakes provide a centralized repository for data from various sources, enabling organizations to unlock valuable insights and drive data-driven decision-making. However, as data volumes continue to grow, optimizing data layout and organization becomes crucial for efficient querying and analysis.

One of the key challenges in data lakes is the potential for slow query performance, especially when dealing with large datasets. This can be attributed to factors such as inefficient data layout, resulting in excessive data scanning and inefficient use of compute resources. To address this challenge, common practices like partitioning and bucketing can significantly improve query performance and reduce computation costs.

Partitioning is a technique that divides a large dataset into smaller, more manageable parts based on specific criteria, such as date, region, or product category. By partitioning data, downstream analytical queries can skip irrelevant partitions, reducing the amount of data that needs to be scanned and processed. You can use partition columns in the WHERE clause in queries to scan only the specific partitions that your query needs. This can lead to faster query runtimes and more efficient resource utilization. It especially works well when columns with low cardinality are chosen as the key.

What if you have a high cardinality column that you sometimes need to filter by VIP customers? Each customer is usually identified with an ID, which can be millions. Partitioning isn’t suitable for such high cardinality columns because you end up with small files, slow partition filtering, and high Amazon Simple Storage Service (Amazon S3) API cost (one S3 prefix is created per value of partition column). Although you can use partitioning with a natural key such as city or state to narrow down your dataset to some degree, it is still necessary to query across date-based partitions if your data is time series.

This is where bucketing comes into play. Bucketing makes sure that all rows with the same values of one or more columns end up in the same file. Instead of one file per value, like partitioning, a hash function is used to distribute values evenly across a fixed number of files. By organizing data this way, you can perform efficient filtering, because only the relevant buckets need to be processed, further reducing computational overhead.

There are multiple options for implementing bucketing on AWS. One approach is to use the Amazon Athena CREATE TABLE AS SELECT (CTAS) statement, which allows you to create a bucketed table directly from a query. Alternatively, you can use AWS Glue for Apache Spark, which provides built-in support for bucketing configurations during the data transformation process. AWS Glue allows you to define bucketing parameters, such as the number of buckets and the columns to bucket on, providing an optimized data layout for efficient querying with Athena.

In this post, we discuss how to implement bucketing on AWS data lakes, including using Athena CTAS statement and AWS Glue for Apache Spark. We also cover bucketing for Apache Iceberg tables.

Example use case

In this post, you use a public dataset, the NOAA Integrated Surface Database. Data analysts run one-time queries for data during the past 5 years through Athena. Most of the queries are for specific stations with specific report types. The queries need to complete in 10 seconds, and the cost needs to be optimized carefully. In this scenario, you’re a data engineer responsible for optimizing query performance and cost.

For example, if an analyst wants to retrieve data for a specific station (for example, station ID 123456) with a particular report type (for example, CRN01), the query might look like the following query:

SELECT station, report_type, columnA, columnB, ...
FROM table_name
WHERE
report_type = 'CRN01'
AND station = '123456'

In the case of the NOAA Integrated Surface Database, the station_id column is likely to have a high cardinality, with numerous unique station identifiers. On the other hand, the report_type column may have a relatively low cardinality, with a limited set of report types. Given this scenario, it would be a good idea to partition the data by report_type and bucket it by station_id.

With this partitioning and bucketing strategy, Athena can first eliminate partitions for irrelevant report types, and then scan only the buckets within the relevant partition that match the specified station ID, significantly reducing the amount of data processed and accelerating query runtimes. This approach not only meets the query performance requirement, but also helps optimize costs by minimizing the amount of data scanned and billed for each query.

In this post, we examine how query performance is affected by data layout, in particular, bucketing. We also compare three different ways to achieve bucketing. The following table represents conditions for the tables to be created.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Parquet
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes

noaa_remote_original is partitioned by the year column, but not by the report_type column. This row represents if the table is partitioned by the actual columns that are used in the queries.

Baseline table

For this post, you create several tables with different conditions: some without bucketing and some with bucketing, to showcase the performance characteristics of bucketing. First, let’s create an original table using the NOAA data. In subsequent steps, you ingest data from this table to create test tables.

There are multiple ways to define a table definition: running DDL, an AWS Glue crawler, the AWS Glue Data Catalog API, and so on. In this step, you run DDL via the Athena console.

Complete the following steps to create the "bucketing_blog"."noaa_remote_original" table in the Data Catalog:

  1. Open the Athena console.
  2. In the query editor, run the following DDL to create a new AWS Glue database:
    -- Create Glue database
    CREATE DATABASE bucketing_blog;

  3. For Database under Data, choose bucketing_blog to set the current database.
  4. Run the following DDL to create the original table:
    -- Create original table
    CREATE EXTERNAL TABLE `bucketing_blog`.`noaa_remote_original`(
      `station` STRING, 
      `date` STRING, 
      `source` STRING, 
      `latitude` STRING, 
      `longitude` STRING, 
      `elevation` STRING, 
      `name` STRING, 
      `report_type` STRING, 
      `call_sign` STRING, 
      `quality_control` STRING, 
      `wnd` STRING, 
      `cig` STRING, 
      `vis` STRING, 
      `tmp` STRING, 
      `dew` STRING, 
      `slp` STRING, 
      `aj1` STRING, 
      `gf1` STRING, 
      `mw1` STRING)
    PARTITIONED BY (
        year STRING)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
    WITH SERDEPROPERTIES ( 
      'escapeChar'='\\',
      'quoteChar'='\"',
      'separatorChar'=',') 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://noaa-global-hourly-pds/'
    TBLPROPERTIES (
      'skip.header.line.count'='1'
    )

Because the source data has quoted fields, we use OpenCSVSerde instead of the default LazySimpleSerde.

These CSV files have a header row, which we tell Athena to skip by adding skip.header.line.count and setting the value to 1.

For more details, refer to OpenCSVSerDe for processing CSV.

  1. Run the following DDL to add partitions. We add partitions only for 5 years out of 124 years based on the use case requirement:
    -- Load partitions
    ALTER TABLE `bucketing_blog`.`noaa_remote_original` ADD
      PARTITION (year = '2024') LOCATION 's3://noaa-global-hourly-pds/2024/'
      PARTITION (year = '2023') LOCATION 's3://noaa-global-hourly-pds/2023/'
      PARTITION (year = '2022') LOCATION 's3://noaa-global-hourly-pds/2022/'
      PARTITION (year = '2021') LOCATION 's3://noaa-global-hourly-pds/2021/'
      PARTITION (year = '2020') LOCATION 's3://noaa-global-hourly-pds/2020/';

  2. Run the following DML to verify if you can successfully query the data:
    -- Check data 
    SELECT * FROM "bucketing_blog"."noaa_remote_original" LIMIT 10;

Now you’re ready to start querying the original table to examine the baseline performance.

  1. Run a query against the original table to evaluate the query performance as a baseline. The following query selects records for five specific stations with report type CRN05:
    -- Baseline
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."noaa_remote_original"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );

We ran this query 10 times. The average query runtime for 10 queries is 27.6 seconds, which is far longer than our target of 10 seconds, and 155.75 GB data is scanned to return 1.65 million records. This is the baseline performance of the original raw table. It’s time to start optimizing data layout from this baseline.

Next, you create tables with different conditions from the original: one without bucketing and one with bucketing, and compare them.

Optimize data layout using Athena CTAS

In this section, we use an Athena CTAS query to optimize data layout and its format.

First, let’s create a table with partitioning but without bucketing. The new table is partitioned by the column report_type because most of expected queries use this column in the WHERE clause, and objects are stored as Parquet with Snappy compression.

  1. Open the Athena query editor.
  2. Run the following query, providing your own S3 bucket and prefix:
    --CTAS, non-bucketed
    CREATE TABLE "bucketing_blog"."athena_non_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-non-bucketed/',
        partitioned_by = ARRAY['report_type'],
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshots.


There are 30 files under the partition.

Next, you create a table with Hive style bucketing. The number of buckets needs to be carefully tuned through experiments for your own use case. Generally speaking, the more buckets you have, the smaller the granularity, which might result in better performance. On the other hand, too many small files may introduce inefficiency in query planning and processing. Also, bucketing only works if you are querying a few values of the bucketing key. The more values you add to your query, the more likely that you will end up reading all buckets.

The following is the baseline query to optimize:

-- Baseline
SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
FROM "bucketing_blog"."noaa_remote_original"
WHERE
    report_type = 'CRN05'
    AND ( station = '99999904237'
        OR station = '99999953132'
        OR station = '99999903061'
        OR station = '99999963856'
        OR station = '99999994644'
    );

In this example, the table is going to be bucketed into 16 buckets by a high-cardinality column (station), which is supposed to be used for the WHERE clause in the query. All other conditions remain the same. The baseline query has five values in the station ID, and you expect queries to have around that number at most, which is less enough than the number of buckets, so 16 should work well. It is possible to specify a larger number of buckets, but CTAS can’t be used if the total number of partitions exceeds 100.

  1. Run the following query:
    -- CTAS, Hive-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-bucketed/',
        partitioned_by = ARRAY['report_type'],
        bucketed_by = ARRAY['station'],
        bucket_count = 16,
        format = 'PARQUET',
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

The query creates S3 objects organized as shown in the following screenshots.


The table-level layout looks exactly the same between athena_non_bucketed and athena_bucketed: there are 13 partitions in each table. The difference is the number of objects under the partitions. There are 16 objects (buckets) per partition, of roughly 10–25 MB each in this case. The number of buckets is constant at the specified value regardless of the amount of data, but the bucket size depends on the amount of data.

Now you’re ready to query against each table to evaluate query performance. The query will select records with five specific stations and report type CRN05 for the past 5 years. Although you can’t see which data of a specific station is located in which bucket, it has been calculated and located correctly by Athena.

  1. Query the non-bucketed table with the following statement:
    -- No bucketing 
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_non_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 10.95 seconds, and 358 MB of data is scanned to return 2.21 million records. Both the runtime and scan size have been significantly decreased because you’ve partitioned the data, and can now read only one partition where 12 partitions of 13 are skipped. In addition, the amount of data scanned has gone down from 206 GB to 360 MB, which is a reduction of 99.8%. This is not just due to the partitioning, but also due to the change of its format to Parquet and compression with Snappy.

  1. Query the bucketed table with the following statement:
    -- Hive bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this query 10 times. The average runtime of the 10 queries is 7.82 seconds, and 69 MB of data is scanned to return 2.21 million records. This means a reduction of average runtime from 10.95 to 7.82 seconds (-29%), and a dramatic reduction of data scanned from 358 MB to 69 MB (-81%) to return the same number of records compared with the non-bucketed table. In this case, both runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

Considerations

As stated earlier, size your bucket carefully to maximize performance of your query. Bucketing only works if you are querying a few values of the bucketing key. Consider creating more buckets than the number of values expected in the actual query.

Additionally, an Athena CTAS query is limited to create up to 100 partitions at one time. If you need a large number of partitions, you may want to use AWS Glue extract, transform, and load (ETL), although there is a workaround to split into multiple SQL statements.

Optimize data layout using AWS Glue ETL

Apache Spark is an open source distributed processing framework that enables flexible ETL with PySpark, Scala, and Spark SQL. It allows you to partition and bucket your data based on your requirements. Spark has several tuning options to accelerate jobs. You can effortlessly automate and monitor Spark jobs. In this section, we use AWS Glue ETL jobs to run Spark code to optimize data layout.

Unlike Athena bucketing, AWS Glue ETL uses Spark-based bucketing as a bucketing algorithm. All you need to do is add the following table property onto the table: bucketing_format = 'spark'. For details about this table property, see Partitioning and bucketing in Athena.

Complete the following steps to create a table with bucketing through AWS Glue ETL:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Create job and choose Visual ETL.
  3. Under Add nodes, choose AWS Glue Data Catalog for Sources.
  4. For Database, choose bucketing_blog.
  5. For Table, choose noaa_remote_original.
  6. Under Add nodes, choose Change Schema for Transforms.
  7. Under Add nodes, choose Custom Transform for Transforms.
  8. For Name, enter ToS3WithBucketing.
  9. For Node parents, choose Change Schema.
  10. For Code block, enter the following code snippet:
    def ToS3WithBucketing (glueContext, dfc) -> DynamicFrameCollection:
        # Convert DynamicFrame to DataFrame
        df = dfc.select(list(dfc.keys())[0]).toDF()
        
        # Write to S3 with bucketing and partitioning
        df.repartition(1, "report_type") \
            .write.option("path", "s3://<your-s3-location>/glue-bucketed/") \
            .mode("overwrite") \
            .partitionBy("report_type") \
            .bucketBy(16, "station") \
            .format("parquet") \
            .option("compression", "snappy") \
            .saveAsTable("bucketing_blog.glue_bucketed")

The following screenshot shows the job created using AWS Glue Studio to generate a table and data.

Each node represents the following:

  • The AWS Glue Data Catalog node loads the noaa_remote_original table from the Data Catalog
  • The Change Schema node makes sure that it loads columns registered in the Data Catalog
  • The ToS3WithBucketing node writes data to Amazon S3 with both partitioning and Spark-based bucketing

The job has been successfully authored in the visual editor.

  1. Under Job details, for IAM Role, choose your AWS Identity and Access Management (IAM) role for this job.
  2. For Worker type, choose G.8X.
  3. For Requested number of workers, enter 5.
  4. Choose Save, then choose Run.

After these steps, the table glue_bucketed. has been created.

  1. Choose Tables in the navigation pane, and choose the table glue_bucketed.
  2. On the Actions menu, choose Edit table under Manage.
  3. In the Table properties section, choose Add.
  4. Add a key pair with key bucketing_format and value spark.
  5. Choose Save.

Now it’s time to query the tables.

  1. Query the bucketed table with the following statement:
    -- Spark bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."glue_bucketed"
    WHERE
        report_type = 'CRN05'
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran the query 10 times. The average runtime of the 10 queries is 7.09 seconds, and 88 MB of data is scanned to return 2.21 million records. In this case, both the runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction.

The reason for the larger bytes scanned compared to the Athena CTAS example is that the values were distributed differently in this table. In the AWS Glue bucketed table, the values were distributed over five files. In the Athena CTAS bucketed table, the values were distributed over four files. Remember that rows are distributed into buckets using a hash function. The Spark bucketing algorithm uses a different hash function than Hive, and in this case, it resulted in a different distribution across the files.

Considerations

Glue DynamicFrame does not support bucketing natively. You need to use Spark DataFrame instead of DynamicFrame to bucket tables.

For information about fine-tuning AWS Glue ETL performance, refer to Best practices for performance tuning AWS Glue for Apache Spark jobs.

Optimize Iceberg data layout with hidden partitioning

Apache Iceberg is a high-performance open table format for huge analytic tables, bringing the reliability and simplicity of SQL tables to big data. Recently, there has been a huge demand to use Apache Iceberg tables to achieve advanced capabilities like ACID transaction, time travel query, and more.

In Iceberg, bucketing works differently than the Hive table method we’ve seen so far. In Iceberg, bucketing is a subset of partitioning, and can be applied using the bucket partition transform. The way you use it and the end result is similar to bucketing in Hive tables. For more details about Iceberg bucket transforms, refer to Bucket Transform Details.

Complete the following steps:

  1. Open the Athena query editor.
  2. Run the following query to create an Iceberg table with hidden partitioning along with bucketing:
    -- CTAS, Iceberg-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed_iceberg"
    WITH (table_type = 'ICEBERG',
          location = 's3://<your-s3-location>/athena-bucketed-iceberg/', 
          is_external = false,
          partitioning = ARRAY['report_type', 'bucket(station, 16)'],
          format = 'PARQUET',
          write_compression = 'SNAPPY'
    ) 
    AS
    SELECT
        station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your data should look like the following screenshot.

There are two folders: data and metadata. Drill down to data.

You see random prefixes under the data folder. Choose the first one to view its details.

You see the top-level partition based on the report_type column. Drill down to the next level.

You see the second-level partition, bucketed with the station column.

The Parquet data files exist under these folders.

  1. Query the bucketed table with the following statement:
    -- Iceberg bucketing
    SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed_iceberg"
    WHERE
        report_type = 'CRN05'
        AND
        ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


With the Iceberg-bucketed table, the average runtime of the 10 queries is 8.03 seconds, and 148 MB of data is scanned to return 2.21 million records. This is less efficient than bucketing with AWS Glue or Athena, but considering the benefits of Iceberg’s various features, it is within an acceptable range.

Results

The following table summarizes all the results.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Iceberg (Parquet)
Compression n/a Snappy Snappy Snappy Snappy
Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Table size (GB) 155.8 5.0 5.0 5.8 5.0
The number of S3 Objects 53360 376 192 192 195
Is partitioned? Yes but with different way Yes Yes Yes Yes
Is bucketed? No No Yes Yes Yes
Bucketing format n/a n/a Hive Spark Iceberg
Number of buckets n/a n/a 16 16 16
Average runtime (sec) 29.178 10.950 7.815 7.089 8.030
Scanned size (MB) 206640.0 358.6 69.1 87.8 147.7

With athena_bucketed, glue_bucketed, and athena_bucketed_iceberg, you were able to meet the latency goal of 10 seconds. With bucketing, you saw a 25–40% reduction in runtime and a 60–85% reduction in scan size, which can contribute to both latency and cost optimization.

As you can see from the result, although partitioning contributes significantly to reduce both runtime and scan size, bucketing can also contribute to reduce them further.

Athena CTAS is straightforward and fast enough to complete the bucketing process. AWS Glue ETL is more flexible and scalable to achieve advanced use cases. You can choose either method based on your requirement and use case, because you can take advantage of bucketing through either option.

Conclusion

In this post, we demonstrated how to optimize your table data layout with partitioning and bucketing through Athena CTAS and AWS Glue ETL. We showed that bucketing contributes to accelerating query latency and reducing scan size to further optimize costs. We also discussed bucketing for Iceberg tables through hidden partitioning.

Bucketing just one technique to optimize data layout by reducing data scan. For optimizing your entire data layout, we recommend considering other options like partitioning, using columnar file format, and compression in conjunction with bucketing. This can enable your data to further enhance query performance.

Happy bucketing!


About the Authors

Takeshi Nakatani is a Principal Big Data Consultant on the Professional Services team in Tokyo. He has 26 years of experience in the IT industry, with expertise in architecting data infrastructure. On his days off, he can be a rock drummer or a motorcyclist.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

How Salesforce optimized their detection and response platform using AWS managed services

Post Syndicated from Atul Khare original https://aws.amazon.com/blogs/big-data/how-salesforce-optimized-their-detection-and-response-platform-using-aws-managed-services/

This is a guest blog post co-authored with Atul Khare and Bhupender Panwar from Salesforce.

Headquartered in San Francisco, Salesforce, Inc. is a cloud-based customer relationship management (CRM) software company building artificial intelligence (AI)-powered business applications that allow businesses to connect with their customers in new and personalized ways.

The Salesforce Trust Intelligence Platform (TIP) log platform team is responsible for data pipeline and data lake infrastructure, providing log ingestion, normalization, persistence, search, and detection capability to ensure Salesforce is safe from threat actors. It runs miscellaneous services to facilitate investigation, mitigation, and containment for security operations. The TIP team is critical to securing Salesforce’s infrastructure, detecting malicious threat activities, and providing timely responses to security events. This is achieved by collecting and inspecting petabytes of security logs across dozens of organizations, some with thousands of accounts.

In this post, we discuss how the Salesforce TIP team optimized their architecture using Amazon Web Services (AWS) managed services to achieve better scalability, cost, and operational efficiency.

TIP existing architecture bird’s eye view and scale of the platform

The main key performance indicator (KPI) for the TIP platform is its capability to ingest a high volume of security logs from a variety of Salesforce internal systems in real time and process them with high velocity. The platform ingests more than 1 PB of data per day, more than 10 million events per second, and more than 200 different log types. The platform ingests log files in JSON, text, and Common Event Format (CEF) formats.

The message bus in TIP’s existing architecture mainly uses Apache Kafka for ingesting different log types coming from the upstream systems. Kafka had a single topic for all the log types before they were consumed by different downstream applications including Splunk, Streaming Search, and Log Normalizer. The Normalized Parquet Logs are stored in an Amazon Simple Storage Service (Amazon S3) data lake and cataloged into Hive Metastore (HMS) on an Amazon Relational Database Service (Amazon RDS) instance based on S3 event notifications. The data lake consumers then use Apache Presto running on Amazon EMR cluster to perform one-time queries. Other teams including the Data Science and Machine Learning teams use the platform to detect, analyze, and control security threats.

Challenges with the existing TIP log platform architecture

Some of the main challenges that TIP’s existing architecture was facing include:

  • Heavy operational overhead and maintenance cost managing the Kafka cluster
  • High cost to serve (CTS) to meet growing business needs
  • Compute threads limited by partitions’ numbers
  • Difficult to scale out when traffic increases
  • Weekly patching creates lags
  • Challenges with HMS scalability

All these challenges motivated the TIP team to embark on a journey to create a more optimized platform that’s easier to scale with less operational overhead and lower CTS.

New TIP log platform architecture

The Salesforce TIP log platform engineering team, in collaboration with AWS, started building the new architecture to replace the Kafka-based message bus solution with the fully managed AWS messaging and notification solutions Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Notification Service (Amazon SNS). In the new design, the upstream systems send their logs to a central Amazon S3 storage location, which invokes a process to partition the logs and store them in an S3 data lake. Consumer applications such as Splunk get the messages delivered to their system using Amazon SQS. Similarly, the partitioned log data through Amazon SQS events initializes a log normalization process that delivers the normalized log data to open source Delta Lake tables on an S3 data lake. One of the major changes in the new architecture is the use of an AWS Glue Data Catalog to replace the previous Hive Metastore. The one-time analysis applications use Apache Trino on an Amazon EMR cluster to query the Delta Tables cataloged in AWS Glue. Other consumer applications also read the data from S3 data lake files stored in Delta Table format. More details on some of the important processes are as follows:

Log partitioner (Spark structured stream)

This service ingests logs from the Amazon S3 SNS SQS-based store and stores them in the partitioned (by log types) format in S3 for further downstream consumptions from the Amazon SNS SQS subscription. This is the bronze layer of the TIP data lake.

Log normalizer (Spark structured stream)

One of the downstream consumers of log partitioner (Splunk Ingestor is another one), the log normalizer ingests the data from Partitioned Output S3, using Amazon SNS SQS notifications, and enriches them using Salesforce custom parsers and tags. Finally, this enriched data is landed in the data lake on S3. This is the silver layer of the TIP data lake.

Machine learning and other data analytics consumers (Trino, Flink, and Spark Jobs)

These consumers consume from the silver layer of the TIP data lake and run analytics for security detection use cases. The earlier Kafka interface is now converted to delta streams ingestion, which concludes the total removal of the Kafka bus from the TIP data pipeline.

Advantages of the new TIP log platform architecture

The main advantages realized by the Salesforce TIP team based on this new architecture using Amazon S3, Amazon SNS, and Amazon SQS include:

  • Cost savings of approximately $400 thousand per month
  • Auto scaling to meet growing business needs
  • Zero DevOps maintenance overhead
  • No mapping of partitions to compute threads
  • Compute resources can be scaled up and down independently
  • Fully managed Data Catalog to reduce the operational overhead of managing HMS

Summary

In this blog post we discussed how the Salesforce Trust Intelligence Platform (TIP) optimized their data pipeline by replacing the Kafka-based message bus solution with fully managed AWS messaging and notification solutions using Amazon SQS and Amazon SNS. Salesforce and AWS teams worked together to make sure this new platform seamlessly scales to ingest more than 1 PB of data per day, more than 10 millions events per second, and more than 200 different log types. Reach out to your AWS account team if you have similar use cases and you need help architecting your platform to achieve operational efficiencies and scale.


About the authors

Atul Khare is a Director of Engineering at Salesforce Security, where he spearheads the Security Log Platform and Data Lakehouse initiatives. He supports diverse security customers by building robust big data ETL pipeline that is elastic, resilient, and easy to use, providing uniform & consistent security datasets for threat detection and response operations, AI, forensic analysis, analytics, and compliance needs across all Salesforce clouds. Beyond his professional endeavors, Atul enjoys performing music with his band to raise funds for local charities.

Bhupender Panwar is a Big Data Architect at Salesforce and seasoned advocate for big data and cloud computing. His background encompasses the development of data-intensive applications and pipelines, solving intricate architectural and scalability challenges, and extracting valuable insights from extensive datasets within the technology industry. Outside of his big data work, Bhupender loves to hike, bike, enjoy travel and is a great foodie.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Vikas Panghal is the Principal Product Manager leading the product management team for Amazon SNS and Amazon SQS. He has deep expertise in event-driven and messaging applications and brings a wealth of knowledge and experience to his role, shaping the future of messaging services. He is passionate about helping customers build highly scalable, fault-tolerant, and loosely coupled systems. Outside of work, he enjoys spending time with his family outdoors, playing chess, and running.

Amazon DataZone announces integration with AWS Lake Formation hybrid access mode for the AWS Glue Data Catalog

Post Syndicated from Utkarsh Mittal original https://aws.amazon.com/blogs/big-data/amazon-datazone-announces-integration-with-aws-lake-formation-hybrid-access-mode-for-the-aws-glue-data-catalog/

Last week, we announced the general availability of the integration between Amazon DataZone and AWS Lake Formation hybrid access mode. In this post, we share how this new feature helps you simplify the way you use Amazon DataZone to enable secure and governed sharing of your data in the AWS Glue Data Catalog. We also delve into how data producers can share their AWS Glue tables through Amazon DataZone without needing to register them in Lake Formation first.

Overview of the Amazon DataZone integration with Lake Formation hybrid access mode

Amazon DataZone is a fully managed data management service to catalog, discover, analyze, share, and govern data between data producers and consumers in your organization. With Amazon DataZone, data producers populate the business data catalog with data assets from data sources such as the AWS Glue Data Catalog and Amazon Redshift. They also enrich their assets with business context to make it straightforward for data consumers to understand. After the data is available in the catalog, data consumers such as analysts and data scientists can search and access this data by requesting subscriptions. When the request is approved, Amazon DataZone can automatically provision access to the data by managing permissions in Lake Formation or Amazon Redshift so that the data consumer can start querying the data using tools such as Amazon Athena or Amazon Redshift.

To manage the access to data in the AWS Glue Data Catalog, Amazon DataZone uses Lake Formation. Previously, if you wanted to use Amazon DataZone for managing access to your data in the AWS Glue Data Catalog, you had to onboard your data to Lake Formation first. Now, the integration of Amazon DataZone and Lake Formation hybrid access mode simplifies how you can get started with your Amazon DataZone journey by removing the need to onboard your data to Lake Formation first.

Lake Formation hybrid access mode allows you to start managing permissions on your AWS Glue databases and tables through Lake Formation, while continuing to maintain any existing AWS Identity and Access Management (IAM) permissions on these tables and databases. Lake Formation hybrid access mode supports two permission pathways to the same Data Catalog databases and tables:

  • In the first pathway, Lake Formation allows you to select specific principals (opt-in principals) and grant them Lake Formation permissions to access databases and tables by opting in
  • The second pathway allows all other principals (that are not added as opt-in principals) to access these resources through the IAM principal policies for Amazon Simple Storage Service (Amazon S3) and AWS Glue actions

With the integration between Amazon DataZone and Lake Formation hybrid access mode, if you have tables in the AWS Glue Data Catalog that are managed through IAM-based policies, you can publish these tables directly to Amazon DataZone, without registering them in Lake Formation. Amazon DataZone registers the location of these tables in Lake Formation using hybrid access mode, which allows managing permissions on AWS Glue tables through Lake Formation, while continuing to maintain any existing IAM permissions.

Amazon DataZone enables you to publish any type of asset in the business data catalog. For some of these assets, Amazon DataZone can automatically manage access grants. These assets are called managed assets, and include Lake Formation-managed Data Catalog tables and Amazon Redshift tables and views. Prior to this integration, you had to complete the following steps before Amazon DataZone could treat the published Data Catalog table as a managed asset:

  1. Identity the Amazon S3 location associated with Data Catalog table.
  2. Register the Amazon S3 location with Lake Formation in hybrid access mode using a role with appropriate permissions.
  3. Publish the table metadata to the Amazon DataZone business data catalog.

The following diagram illustrates this workflow.

With the Amazon DataZone’s integration with Lake Formation hybrid access mode, you can simply publish your AWS Glue tables to Amazon DataZone without having to worry about registering the Amazon S3 location or adding an opt-in principal in Lake Formation by delegating these steps to Amazon DataZone. The administrator of an AWS account can enable the data location registration setting under the DefaultDataLake blueprint on the Amazon DataZone console. Now, a data owner or publisher can publish their AWS Glue table (managed through IAM permissions) to Amazon DataZone without the extra setup steps. When a data consumer subscribes to this table, Amazon DataZone registers the Amazon S3 locations of the table in hybrid access mode, adds the data consumer’s IAM role as an opt-in principal, and grants access to the same IAM role by managing permissions on the table through Lake Formation. This makes sure that IAM permissions on the table can coexist with newly granted Lake Formation permissions, without disrupting any existing workflows. The following diagram illustrates this workflow.

Solution overview

To demonstrate this new capability, we use a sample customer scenario where the finance team wants to access data owned by the sales team for financial analysis and reporting. The sales team has a pipeline that creates a dataset containing valuable information about ticket sales, popular events, venues, and seasons. We call it the tickit dataset. The sales team stores this dataset in Amazon S3 and registers it in a database in the Data Catalog. The access to this table is currently managed through IAM-based permissions. However, the sales team wants to publish this table to Amazon DataZone to facilitate secure and governed data sharing with the finance team.

The steps to configure this solution are as follows:

  1. The Amazon DataZone administrator enables the data lake location registration setting in Amazon DataZone to automatically register the Amazon S3 location of the AWS Glue tables in Lake Formation hybrid access mode.
  2. After the hybrid access mode integration is enabled in Amazon DataZone, the finance team requests a subscription to the sales data asset. The asset shows up as a managed asset, which means Amazon DataZone can manage access to this asset even if the Amazon S3 location of this asset isn’t registered in Lake Formation.
  3. The sales team is notified of a subscription request raised by the finance team. They review and approve the access request. After the request is approved, Amazon DataZone fulfills the subscription request by managing permissions in the Lake Formation. It registers the Amazon S3 location of the subscribed table in Lake Formation hybrid mode.
  4. The finance team gains access to the sales dataset required for their financial reports. They can go to their DataZone environment and start running queries using Athena against their subscribed dataset.

Prerequisites

To follow the steps in this post, you need an AWS account. If you don’t have an account, you can create one. In addition, you must have the following resources configured in your account:

  • An S3 bucket
  • An AWS Glue database and crawler
  • IAM roles for different personas and services
  • An Amazon DataZone domain and project
  • An Amazon DataZone environment profile and environment
  • An Amazon DataZone data source

If you don’t have these resources already configured, you can create them by deploying the following AWS CloudFormation stack:

  1. Choose Launch Stack to deploy a CloudFormation template.
  2. Complete the steps to deploy the template and leave all settings as default.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources, then choose Submit.

After the CloudFormation deployment is complete, you can log in to the Amazon DataZone portal and manually trigger a data source run. This pulls any new or modified metadata from the source and updates the associated assets in the inventory. This data source has been configured to automatically publish the data assets to the catalog.

  1. On the Amazon DataZone console, choose View domains.

You should be logged in using the same role that is used to deploy CloudFormation and verify that you are in the same AWS Region.

  1. Find the domain blog_dz_domain, then choose Open data portal.
  2. Choose Browse all projects and choose Sales producer project.
  3. On the Data tab, choose Data sources in the navigation pane.
  4. Locate and choose the data source that you want to run.

This opens the data source details page.

  1. Choose the options menu (three vertical dots) next to tickit_datasource and choose Run.

The data source status changes to Running as Amazon DataZone updates the asset metadata.

Enable hybrid mode integration in Amazon DataZone

In this step, the Amazon DataZone administrator goes through the process of enabling the Amazon DataZone integration with Lake Formation hybrid access mode. Complete the following steps:

  1. On a separate browser tab, open the Amazon DataZone console.

Verify that you are in the same Region where you deployed the CloudFormation template.

  1. Choose View domains.
  2. Choose the domain created by AWS CloudFormation, blog_dz_domain.
  3. Scroll down on the domain details page and choose the Blueprints tab.

A blueprint defines what AWS tools and services can be used with the data assets published in Amazon DataZone. The DefaultDataLake blueprint is enabled as part of the CloudFormation stack deployment. This blueprint enables you to create and query AWS Glue tables using Athena. For the steps to enable this in your own deployments, refer to Enable built-in blueprints in the AWS account that owns the Amazon DataZone domain.

  1. Choose the DefaultDataLake blueprint.
  2. On the Provisioning tab, choose Edit.
  3. Select Enable Amazon DataZone to register S3 locations using AWS Lake Formation hybrid access mode.

You have the option of excluding specific Amazon S3 locations if you don’t want Amazon DataZone to automatically register them to Lake Formation hybrid access mode.

  1. Choose Save changes.

Request access

In this step, you log in to Amazon DataZone as the finance team, search for the sales data asset, and subscribe to it. Complete the following steps:

  1. Return to your Amazon DataZone data portal browser tab.
  2. Switch to the finance consumer project by choosing the dropdown menu next to the project name and choosing Finance consumer project.

From this step onwards, you take on the persona of a finance user looking to subscribe to a data asset published in the previous step.

  1. In the search bar, search for and choose the sales data asset.
  2. Choose Subscribe.

The asset shows up as managed asset. This means that Amazon DataZone can grant access to this data asset to the finance team’s project by managing the permissions in Lake Formation.

  1. Enter a reason for the access request and choose Subscribe.

Approve access request

The sales team gets a notification that an access request from the finance team is submitted. To approve the request, complete the following steps:

  1. Choose the dropdown menu next to the project name and choose Sales producer project.

You now assume the persona of the sales team, who are the owners and stewards of the sales data assets.

  1. Choose the notification icon at the top-right corner of the DataZone portal.
  2. Choose the Subscription Request Created task.
  3. Grant access to the sales data asset to the finance team and choose Approve.

Analyze the data

The finance team has now been granted access to the sales data, and this dataset has been to their Amazon DataZone environment. They can access the environment and query the sales dataset with Athena, along with any other datasets they currently own. Complete the following steps:

  1. On the dropdown menu, choose Finance consumer project.

On the right pane of the project overview screen, you can find a list of active environments available for use.

  1. Choose the Amazon DataZone environment finance_dz_environment.
  2. In the navigation pane, under Data assets, choose Subscribed.
  3. Verify that your environment now has access to the sales data.

It may take a few minutes for the data asset to be automatically added to your environment.

  1. Choose the new tab icon for Query data.

A new tab opens with the Athena query editor.

  1. For Database, choose finance_consumer_db_tickitdb-<suffix>.

This database will contain your subscribed data assets.

  1. Generate a preview of the sales table by choosing the options menu (three vertical dots) and choosing Preview table.

Clean up

To clean up your resources, complete the following steps:

  1. Switch back to the administrator role you used to deploy the CloudFormation stack.
  2. On the Amazon DataZone console, delete the projects used in this post. This will delete most project-related objects like data assets and environments.
  3. On the AWS CloudFormation console, delete the stack you deployed in the beginning of this post.
  4. On the Amazon S3 console, delete the S3 buckets containing the tickit dataset.
  5. On the Lake Formation console, delete the Lake Formation admins registered by Amazon DataZone.
  6. On the Lake Formation console, delete tables and databases created by Amazon DataZone.

Conclusion

In this post, we discussed how the integration between Amazon DataZone and Lake Formation hybrid access mode simplifies the process to start using Amazon DataZone for end-to-end governance of your data in the AWS Glue Data Catalog. This integration helps you bypass the manual steps of onboarding to Lake Formation before you can start using Amazon DataZone.

For more information on how to get started with Amazon DataZone, refer to the Getting started guide. Check out the YouTube playlist for some of the latest demos of Amazon DataZone and short descriptions of the capabilities available. For more information about Amazon DataZone, see How Amazon DataZone helps customers find value in oceans of data.


About the Authors

Utkarsh Mittal is a Senior Technical Product Manager for Amazon DataZone at AWS. He is passionate about building innovative products that simplify customers’ end-to-end analytics journeys. Outside of the tech world, Utkarsh loves to play music, with drums being his latest endeavor.

Praveen Kumar is a Principal Analytics Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-centered services. His areas of interests are serverless technology, modern cloud data warehouses, streaming, and generative AI applications.

Paul Villena is a Senior Analytics Solutions Architect in AWS with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interests are infrastructure as code, serverless technologies, and coding in Python

Amazon DataZone now integrates with AWS Glue Data Quality and external data quality solutions

Post Syndicated from Andrea Filippo La Scola original https://aws.amazon.com/blogs/big-data/amazon-datazone-now-integrates-with-aws-glue-data-quality-and-external-data-quality-solutions/

Today, we are pleased to announce that Amazon DataZone is now able to present data quality information for data assets. This information empowers end-users to make informed decisions as to whether or not to use specific assets.

Many organizations already use AWS Glue Data Quality to define and enforce data quality rules on their data, validate data against predefined rules, track data quality metrics, and monitor data quality over time using artificial intelligence (AI). Other organizations monitor the quality of their data through third-party solutions.

Amazon DataZone now integrates directly with AWS Glue to display data quality scores for AWS Glue Data Catalog assets. Additionally, Amazon DataZone now offers APIs for importing data quality scores from external systems.

In this post, we discuss the latest features of Amazon DataZone for data quality, the integration between Amazon DataZone and AWS Glue Data Quality and how you can import data quality scores produced by external systems into Amazon DataZone via API.

Challenges

One of the most common questions we get from customers is related to displaying data quality scores in the Amazon DataZone business data catalog to let business users have visibility into the health and reliability of the datasets.

As data becomes increasingly crucial for driving business decisions, Amazon DataZone users are keenly interested in providing the highest standards of data quality. They recognize the importance of accurate, complete, and timely data in enabling informed decision-making and fostering trust in their analytics and reporting processes.

Amazon DataZone data assets can be updated at varying frequencies. As data is refreshed and updated, changes can happen through upstream processes that put it at risk of not maintaining the intended quality. Data quality scores help you understand if data has maintained the expected level of quality for data consumers to use (through analysis or downstream processes).

From a producer’s perspective, data stewards can now set up Amazon DataZone to automatically import the data quality scores from AWS Glue Data Quality (scheduled or on demand) and include this information in the Amazon DataZone catalog to share with business users. Additionally, you can now use new Amazon DataZone APIs to import data quality scores produced by external systems into the data assets.

With the latest enhancement, Amazon DataZone users can now accomplish the following:

  • Access insights about data quality standards directly from the Amazon DataZone web portal
  • View data quality scores on various KPIs, including data completeness, uniqueness, accuracy
  • Make sure users have a holistic view of the quality and trustworthiness of their data.

In the first part of this post, we walk through the integration between AWS Glue Data Quality and Amazon DataZone. We discuss how to visualize data quality scores in Amazon DataZone, enable AWS Glue Data Quality when creating a new Amazon DataZone data source, and enable data quality for an existing data asset.

In the second part of this post, we discuss how you can import data quality scores produced by external systems into Amazon DataZone via API. In this example, we use Amazon EMR Serverless in combination with the open source library Pydeequ to act as an external system for data quality.

Visualize AWS Glue Data Quality scores in Amazon DataZone

You can now visualize AWS Glue Data Quality scores in data assets that have been published in the Amazon DataZone business catalog and that are searchable through the Amazon DataZone web portal.

If the asset has AWS Glue Data Quality enabled, you can now quickly visualize the data quality score directly in the catalog search pane.

By selecting the corresponding asset, you can understand its content through the readme, glossary terms, and technical and business metadata. Additionally, the overall quality score indicator is displayed in the Asset Details section.

A data quality score serves as an overall indicator of a dataset’s quality, calculated based on the rules you define.

On the Data quality tab, you can access the details of data quality overview indicators and the results of the data quality runs.

The indicators shown on the Overview tab are calculated based on the results of the rulesets from the data quality runs.

Each rule is assigned an attribute that contributes to the calculation of the indicator. For example, rules that have the Completeness attribute will contribute to the calculation of the corresponding indicator on the Overview tab.

To filter data quality results, choose the Applicable column dropdown menu and choose your desired filter parameter.

You can also visualize column-level data quality starting on the Schema tab.

When data quality is enabled for the asset, the data quality results become available, providing insightful quality scores that reflect the integrity and reliability of each column within the dataset.

When you choose one of the data quality result links, you’re redirected to the data quality detail page, filtered by the selected column.

Data quality historical results in Amazon DataZone

Data quality can change over time for many reasons:

  • Data formats may change because of changes in the source systems
  • As data accumulates over time, it may become outdated or inconsistent
  • Data quality can be affected by human errors in data entry, data processing, or data manipulation

In Amazon DataZone, you can now track data quality over time to confirm reliability and accuracy. By analyzing the historical report snapshot, you can identify areas for improvement, implement changes, and measure the effectiveness of those changes.

Enable AWS Glue Data Quality when creating a new Amazon DataZone data source

In this section, we walk through the steps to enable AWS Glue Data Quality when creating a new Amazon DataZone data source.

Prerequisites

To follow along, you should have a domain for Amazon DataZone, an Amazon DataZone project, and a new Amazon DataZone environment (with a DataLakeProfile). For instructions, refer to Amazon DataZone quickstart with AWS Glue data.

You also need to define and run a ruleset against your data, which is a set of data quality rules in AWS Glue Data Quality. To set up the data quality rules and for more information on the topic, refer to the following posts:

After you create the data quality rules, make sure that Amazon DataZone has the permissions to access the AWS Glue database managed through AWS Lake Formation. For instructions, see Configure Lake Formation permissions for Amazon DataZone.

In our example, we have configured a ruleset against a table containing patient data within a healthcare synthetic dataset generated using Synthea. Synthea is a synthetic patient generator that creates realistic patient data and associated medical records that can be used for testing healthcare software applications.

The ruleset contains 27 individual rules (one of them failing), so the overall data quality score is 96%.

If you use Amazon DataZone managed policies, there is no action needed because these will get automatically updated with the needed actions. Otherwise, you need to allow Amazon DataZone to have the required permissions to list and get AWS Glue Data Quality results, as shown in the Amazon DataZone user guide.

Create a data source with data quality enabled

In this section, we create a data source and enable data quality. You can also update an existing data source to enable data quality. We use this data source to import metadata information related to our datasets. Amazon DataZone will also import data quality information related to the (one or more) assets contained in the data source.

  1. On the Amazon DataZone console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Name, enter a name for your data source.
  4. For Data source type, select AWS Glue.
  5. For Environment, choose your environment.
  6. For Database name, enter a name for the database.
  7. For Table selection criteria, choose your criteria.
  8. Choose Next.
  9. For Data quality, select Enable data quality for this data source.

If data quality is enabled, Amazon DataZone will automatically fetch data quality scores from AWS Glue at each data source run.

  1. Choose Next.

Now you can run the data source.

While running the data source, Amazon DataZone imports the last 100 AWS Glue Data Quality run results. This information is now visible on the asset page and will be visible to all Amazon DataZone users after publishing the asset.

Enable data quality for an existing data asset

In this section, we enable data quality for an existing asset. This might be useful for users that already have data sources in place and want to enable the feature afterwards.

Prerequisites

To follow along, you should have already run the data source and produced an AWS Glue table data asset. Additionally, you should have defined a ruleset in AWS Glue Data Quality over the target table in the Data Catalog.

For this example, we ran the data quality job multiple times against the table, producing the related AWS Glue Data Quality scores, as shown in the following screenshot.

Import data quality scores into the data asset

Complete the following steps to import the existing AWS Glue Data Quality scores into the data asset in Amazon DataZone:

  1. Within the Amazon DataZone project, navigate to the Inventory data pane and choose the data source.

If you choose the Data quality tab, you can see that there’s still no information on data quality because AWS Glue Data Quality integration is not enabled for this data asset yet.

  1. On the Data quality tab, choose Enable data quality.
  2. In the Data quality section, select Enable data quality for this data source.
  3. Choose Save.

Now, back on the Inventory data pane, you can see a new tab: Data quality.

On the Data quality tab, you can see data quality scores imported from AWS Glue Data Quality.

Ingest data quality scores from an external source using Amazon DataZone APIs

Many organizations already use systems that calculate data quality by performing tests and assertions on their datasets. Amazon DataZone now supports importing third-party originated data quality scores via API, allowing users that navigate the web portal to view this information.

In this section, we simulate a third-party system pushing data quality scores into Amazon DataZone via APIs through Boto3 (Python SDK for AWS).

For this example, we use the same synthetic dataset as earlier, generated with Synthea.

The following diagram illustrates the solution architecture.

The workflow consists of the following steps:

  1. Read a dataset of patients in Amazon Simple Storage Service (Amazon S3) directly from Amazon EMR using Spark.

The dataset is created as a generic S3 asset collection in Amazon DataZone.

  1. In Amazon EMR, perform data validation rules against the dataset.
  2. The metrics are saved in Amazon S3 to have a persistent output.
  3. Use Amazon DataZone APIs through Boto3 to push custom data quality metadata.
  4. End-users can see the data quality scores by navigating to the data portal.

Prerequisites

We use Amazon EMR Serverless and Pydeequ to run a fully managed Spark environment. To learn more about Pydeequ as a data testing framework, see Testing Data quality at scale with Pydeequ.

To allow Amazon EMR to send data to the Amazon DataZone domain, make sure that the IAM role used by Amazon EMR has the permissions to do the following:

  • Read from and write to the S3 buckets
  • Call the post_time_series_data_points action for Amazon DataZone:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "datazone:PostTimeSeriesDataPoints"
                ],
                "Resource": [
                    "<datazone_domain_arn>"
                ]
            }
        ]
    }

Make sure that you added the EMR role as a project member in the Amazon DataZone project. On the Amazon DataZone console, navigate to the Project members page and choose Add members.

Add the EMR role as a contributor.

Ingest and analyze PySpark code

In this section, we analyze the PySpark code that we use to perform data quality checks and send the results to Amazon DataZone. You can download the complete PySpark script.

To run the script entirely, you can submit a job to EMR Serverless. The service will take care of scheduling the job and automatically allocating the resources needed, enabling you to track the job run statuses throughout the process.

You can submit a job to EMR within the Amazon EMR console using EMR Studio or programmatically, using the AWS CLI or using one of the AWS SDKs.

In Apache Spark, a SparkSession is the entry point for interacting with DataFrames and Spark’s built-in functions. The script will start initializing a SparkSession:

with SparkSession.builder.appName("PatientsDataValidation") \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .getOrCreate() as spark:

We read a dataset from Amazon S3. For increased modularity, you can use the script input to refer to the S3 path:

s3inputFilepath = sys.argv[1]
s3outputLocation = sys.argv[2]

df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(s3inputFilepath) #s3://<bucket_name>/patients/patients.csv

Next, we set up a metrics repository. This can be helpful to persist the run results in Amazon S3.

metricsRepository = FileSystemMetricsRepository(spark, s3_write_path)

Pydeequ allows you to create data quality rules using the builder pattern, which is a well-known software engineering design pattern, concatenating instruction to instantiate a VerificationSuite object:

key_tags = {'tag': 'patient_df'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(metricsRepository) \
    .addCheck(
        check.hasSize(lambda x: x >= 1000) \
        .isComplete("birthdate")  \
        .isUnique("id")  \
        .isComplete("ssn") \
        .isComplete("first") \
        .isComplete("last") \
        .hasMin("healthcare_coverage", lambda x: x == 1000.0)) \
    .saveOrAppendResult(resultKey) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following is the output for the data validation rules:

+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|check           |check_level|check_status|constraint                                          |constraint_status|constraint_message                                  |
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+
|Integrity checks|Error      |Error       |SizeConstraint(Size(None))                          |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(birthdate,None))|Success          |                                                    |
|Integrity checks|Error      |Error       |UniquenessConstraint(Uniqueness(List(id),None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(ssn,None))      |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(first,None))    |Success          |                                                    |
|Integrity checks|Error      |Error       |CompletenessConstraint(Completeness(last,None))     |Success          |                                                    |
|Integrity checks|Error      |Error       |MinimumConstraint(Minimum(healthcare_coverage,None))|Failure          |Value: 0.0 does not meet the constraint requirement!|
+----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+

At this point, we want to insert these data quality values in Amazon DataZone. To do so, we use the post_time_series_data_points function in the Boto3 Amazon DataZone client.

The PostTimeSeriesDataPoints DataZone API allows you to insert new time series data points for a given asset or listing, without creating a new revision.

At this point, you might also want to have more information on which fields are sent as input for the API. You can use the APIs to obtain the specification for Amazon DataZone form types; in our case, it’s amazon.datazone.DataQualityResultFormType.

You can also use the AWS CLI to invoke the API and display the form structure:

aws datazone get-form-type --domain-identifier <your_domain_id> --form-type-identifier amazon.datazone.DataQualityResultFormType --region <domain_region> --output text --query 'model.smithy'

This output helps identify the required API parameters, including fields and value limits:

$version: "2.0"
namespace amazon.datazone
structure DataQualityResultFormType {
    @amazon.datazone#timeSeriesSummary
    @range(min: 0, max: 100)
    passingPercentage: Double
    @amazon.datazone#timeSeriesSummary
    evaluationsCount: Integer
    evaluations: EvaluationResults
}
@length(min: 0, max: 2000)
list EvaluationResults {
    member: EvaluationResult
}

@length(min: 0, max: 20)
list ApplicableFields {
    member: String
}

@length(min: 0, max: 20)
list EvaluationTypes {
    member: String
}

enum EvaluationStatus {
    PASS,
    FAIL
}

string EvaluationDetailType

map EvaluationDetails {
    key: EvaluationDetailType
    value: String
}

structure EvaluationResult {
    description: String
    types: EvaluationTypes
    applicableFields: ApplicableFields
    status: EvaluationStatus
    details: EvaluationDetails
}

To send the appropriate form data, we need to convert the Pydeequ output to match the DataQualityResultsFormType contract. This can be achieved with a Python function that processes the results.

For each DataFrame row, we extract information from the constraint column. For example, take the following code:

CompletenessConstraint(Completeness(birthdate,None))

We convert it to the following:

{
  "constraint": "CompletenessConstraint",
  "statisticName": "Completeness_custom",
  "column": "birthdate"
}

Make sure to send an output that matches the KPIs that you want to track. In our case, we are appending _custom to the statistic name, resulting in the following format for KPIs:

  • Completeness_custom
  • Uniqueness_custom

In a real-world scenario, you might want to set a value that matches with your data quality framework in relation to the KPIs that you want to track in Amazon DataZone.

After applying a transformation function, we have a Python object for each rule evaluation:

..., {
   'applicableFields': ["healthcare_coverage"],
   'types': ["Minimum_custom"],
   'status': 'FAIL',
   'description': 'MinimumConstraint - Minimum - Value: 0.0 does not meet the constraint requirement!'
 },...

We also use the constraint_status column to compute the overall score:

(number of success / total number of evaluation) * 100

In our example, this results in a passing percentage of 85.71%.

We set this value in the passingPercentage input field along with the other information related to the evaluations in the input of the Boto3 method post_time_series_data_points:

import boto3

# Instantiate the client library to communicate with Amazon DataZone Service
#
datazone = boto3.client(
    service_name='datazone', 
    region_name=<Region(String) example: us-east-1>
)

# Perform the API operation to push the Data Quality information to Amazon DataZone
#
datazone.post_time_series_data_points(
    domainIdentifier=<DataZone domain ID>,
    entityIdentifier=<DataZone asset ID>,
    entityType='ASSET',
    forms=[
        {
            "content": json.dumps({
                    "evaluationsCount":<Number of evaluations (number)>,
                    "evaluations": [<List of objects {
                        'description': <Description (String)>,
                        'applicableFields': [<List of columns involved (String)>],
                        'types': [<List of KPIs (String)>],
                        'status': <FAIL/PASS (string)>
                        }>
                     ],
                    "passingPercentage":<Score (number)>
                }),
            "formName": <Form name(String) example: PydeequRuleSet1>,
            "typeIdentifier": "amazon.datazone.DataQualityResultFormType",
            "timestamp": <Date (timestamp)>
        }
    ]
)

Boto3 invokes the Amazon DataZone APIs. In these examples, we used Boto3 and Python, but you can choose one of the AWS SDKs developed in the language you prefer.

After setting the appropriate domain and asset ID and running the method, we can check on the Amazon DataZone console that the asset data quality is now visible on the asset page.

We can observe that the overall score matches with the API input value. We can also see that we were able to add customized KPIs on the overview tab through custom types parameter values.

With the new Amazon DataZone APIs, you can load data quality rules from third-party systems into a specific data asset. With this capability, Amazon DataZone allows you to extend the types of indicators present in AWS Glue Data Quality (such as completeness, minimum, and uniqueness) with custom indicators.

Clean up

We recommend deleting any potentially unused resources to avoid incurring unexpected costs. For example, you can delete the Amazon DataZone domain and the EMR application you created during this process.

Conclusion

In this post, we highlighted the latest features of Amazon DataZone for data quality, empowering end-users with enhanced context and visibility into their data assets. Furthermore, we delved into the seamless integration between Amazon DataZone and AWS Glue Data Quality. You can also use the Amazon DataZone APIs to integrate with external data quality providers, enabling you to maintain a comprehensive and robust data strategy within your AWS environment.

To learn more about Amazon DataZone, refer to the Amazon DataZone User Guide.


About the Authors


Andrea Filippo
is a Partner Solutions Architect at AWS supporting Public Sector partners and customers in Italy. He focuses on modern data architectures and helping customers accelerate their cloud journey with serverless technologies.

Emanuele is a Solutions Architect at AWS, based in Italy, after living and working for more than 5 years in Spain. He enjoys helping large companies with the adoption of cloud technologies, and his area of expertise is mainly focused on Data Analytics and Data Management. Outside of work, he enjoys traveling and collecting action figures.

Varsha Velagapudi is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about simplifying customers’ AI/ML and analytics journey to help them succeed in their day-to-day tasks. Outside of work, she enjoys nature and outdoor activities, reading, and traveling.